the class/import (e.g, ioredis
, redis
, postgres
, etc)
the provider connection options. These are specific to the package (refer to their docs!). Each uses different property names and structures to connect.
Optional
search: WorkflowSearchOptionsthe search options for JSON-based configuration of the backend search module (e.g., Redis FT.Search)
// Example 1) Instantiate MeshData with `ioredis`
import Redis from 'ioredis';
const meshData = new MeshData(Redis, {
host: 'localhost',
port: 6379,
password: 'shhh123',
db: 0,
});
// Example 2) Instantiate MeshData with `redis`
import * as Redis from 'redis';
const meshData = new MeshData(Redis, {
url: 'redis://:shhh123@localhost:6379'
});
// Instantiate MeshData with `postgres`
//...
Exposes the the service mesh control plane through the mesh 'events' (pub/sub) system. This is useful for monitoring and managing the operational data layer.
publishes a message to the mesh control plane
the message payload
connection options
subscribes to the mesh control plane
the callback function
connection options
unsubscribes from the mesh control plane
the callback function
connection options
Search backend configuration (indexed/searchable fields and types)
Static
proxyWrap activities in a proxy that will durably run them, once.
Static
workflowProvides a set of static extensions that can be invoked by your linked workflow functions during their execution.
Starts a new, subordinated workflow/job execution. NOTE: The child workflow's lifecycle is bound to the parent workflow, and it will be terminated/scrubbed when the parent workflow is terminated/scrubbed.
Starts a new, subordinated workflow/job execution. NOTE: The child workflow's lifecycle is bound to the parent workflow, and it will be terminated/scrubbed when the parent workflow is terminated/scrubbed.
Returns the current workflow context restored from Redis
Spawns a hook from either the main thread or a hook thread with the provided options; worflowId/TaskQueue/Name are optional and will default to the current workflowId/WorkflowTopic if not provided
the hook options
Interrupts a job by its entity and id.
Returns a random number between 0 and 1. This number is deterministic and will never vary for a given seed. This is useful for randomizing pathways in a workflow that can be safely replayed.
Sends signal data into any other paused thread (which is currently awaiting the signal)
the signal id
the signal data
Sleeps the workflow for a duration. As the function is reentrant, upon reentry, the function will traverse prior execution paths up until the sleep command and then resume execution thereafter.
See the ms
package for syntax examples: '1 minute', '2 hours', '3 days'
Sleeps the workflow for a duration. As the function is reentrant, upon reentry, the function will traverse prior execution paths up until the sleep command and then resume execution thereafter.
See the ms
package for syntax examples: '1 minute', '2 hours', '3 days'
Starts a new, subordinated workflow/job execution, awaiting only the jobId, namely, the confirmation that the suboridinated job has begun. NOTE: The child workflow's lifecycle is bound to the parent workflow, and it will be terminated/scrubbed when the parent workflow is terminated/scrubbed.
function greet (email: string, user: { first: string}) {
//persist the user's email and newsletter preferences
const search = await MeshData.workflow.search();
await search.set('email', email, 'newsletter', 'yes');
//hook a function to send a newsletter
await MeshData.workflow.hook({
entity: 'user.newsletter',
args: [email]
});
return `Hello, ${user.first}. Your email is [${email}].`;
}
Returns the remote function state for all fields. NOTE:
all
can be less efficient than calling get
as it returns all
fields (HGETALL), not just the ones requested (HMGET). Depending
upon the duration of the workflow, this could represent a large
amount of process/history data.
the entity name (e.g, 'user', 'order', 'product')
the workflow/job id
Optional
options: CallOptions = {}call options
Connects a function to the operational data layer.
The options for connecting a function.
True if connection is successfully established.
// Instantiate MeshData with Redis configuration.
const meshData = new MeshData(Redis, { host: 'localhost', port: 6379 });
// Define and connect a function with the 'greeting' entity.
// The function will be cached indefinitely (infinite TTL).
meshData.connect({
entity: 'greeting',
target: (email, user) => `Hello, ${user.first}.`,
options: { ttl: 'infinity' }
});
Creates a search index for the specified search backend (e.g., FT.search).
the entity name (e.g, 'user', 'order', 'product')
Optional
options: CallOptions = {}call options
Optional
searchOptions: WorkflowSearchOptionssearch options
// create a search index for the 'greeting' entity. pass in search options.
const index = await meshData.createSearchIndex('greeting', {}, { prefix: 'greeting', ... });
// creates a search index for the 'greeting' entity, using the default search options.
const index = await meshData.createSearchIndex('greeting');
Deletes one or more fields from the remote function state.
the entity name (e.g, 'user', 'order', 'product')
the job id
Optional
options: CallOptionscall options
Executes a remote function by its global entity identifier with specified arguments.
If options.ttl is infinity, the function will be cached indefinitely and can only be
removed by calling flush
. During this time, the function will remain active and
its state can be augmented by calling set
, incr
, del
, etc OR by calling a
transactional 'hook' function.
The execution parameters.
A promise that resolves with the result of the remote function execution. If
the input options include await: false
, the promise will resolve with the
workflow ID (string) instead of the result. Make sure to pass string as the
return type if you are using await: false
.
Exports the job profile for the function execution, including all state, process, and timeline data. The information in the export is sufficient to capture the full state of the function in the moment and over time.
the entity name (e.g, 'user', 'order', 'product')
The workflow/job id
Optional
options: ExportOptionsConfiguration options for the export
Optional
namespace: stringthe namespace for the client
Executes the search query; optionally specify other commands
Rest
...args: string[]For those implementations without a search backend, this quasi-equivalent method is provided with a cursor for rudimentary pagination.
Optional
options: FindJobsOptions = {}Provides a JSON abstraction for the backend search engine
(e.g, count
, query
, return
, limit
)
NOTE: If the type is TAG for an entity, .
, @
, and -
must be escaped.
the entity name (e.g, 'user', 'order', 'product')
find options (the query). A custom search schema may be provided.
Returns a number if count
is true, otherwise a SearchResults object.
const results = await meshData.findWhere('greeting', {
query: [
{ field: 'name', is: '=', value: 'John' },
{ field: 'age', is: '>', value: 2 },
{ field: 'quantity', is: '[]', value: [89, 89] }
],
count: false,
limit: { start: 0, size: 10 },
return: ['name', 'quantity']
});
// returns { count: 1, query: 'FT.SEARCH my-index @_name:"John" @_age:[2 +inf] @_quantity:[89 89] LIMIT 0 10', data: [ { name: 'John', quantity: '89' } ] }
Flushes a function with a ttl
of 'infinity'. These entities were
created by a connect method that was configured with a
ttl
of 'infinity'. It can take several seconds for the function
to be removed from the cache as it might be actively orchestrating
sub-workflows.
the entity name (e.g, 'user', 'order', 'product')
The workflow/job id
Optional
namespace: stringthe namespace for the client
Returns the remote function state. this is different than the function response
returned by the exec
method which represents the return value from the
function at the moment it completed. Instead, function state represents
mutable shared state that can be set:
options.search.data
to exec
)meshData.set(...)
)the entity name (e.g, 'user', 'order', 'product')
the job id
Optional
options: CallOptions = {}call options
Similar to exec
, except it augments the workflow state without creating a new job.
The input parameters for hooking a function.
The signal id.
Increments a field in the remote function state.
the entity name (e.g, 'user', 'order', 'product')
the job id
the field name
the amount to increment
Optional
options: CallOptions = {}call options
Retrieves the job profile for the function execution, including metadata such as execution status and result.
the entity name (e.g, 'user', 'order', 'product')
identifier for the job
Optional
options: CallOptions = {}Configuration options for the execution, including custom IDs, time-to-live (TTL) settings, etc. Defaults to an empty object if not provided.
A promise that resolves with the job's output, which
includes metadata about the job's execution status. The
structure of JobOutput
should contain all relevant
information such as execution result, status, and any
error messages if the job failed.
// Retrieve information about a remote function's execution by job ID
const jobInfoById = await meshData.info('greeting', 'job-12345');
// Response: JobOutput
{
metadata: {
tpc: 'meshflow.execute',
app: 'meshflow',
vrs: '1',
jid: 'greeting-jsmith123',
aid: 't1',
ts: '0',
jc: '20240208014803.980',
ju: '20240208065017.762',
js: 0
},
data: {
done: true,
response: 'Hello, Jan. Your email is [jsmith@hotmesh.com].',
workflowId: 'greeting-jsmith123'
}
}
Interrupts a job by its entity and id. It is best not to call this
method directly for entries with a ttl of infinity
(call flush
instead).
For those entities that are cached for a specified duration (e.g., '15 minutes'),
this method will interrupt the job and start the cascaded cleanup/expire/delete.
As jobs are asynchronous, there is no way to stop descendant flows immediately.
Use an expire
option to keep the interrupted job in the cache for a specified
duration before it is fully removed.
the entity name (e.g, 'user', 'order', 'product')
The workflow/job id
Optional
options: JobInterruptOptions = {}call options
Optional
namespace: stringthe namespace for the client
Returns the HASH key given an entity
name and workflow/job. The
item identified by this key is a HASH record with multidimensional process
data interleaved with the function state data.
the entity name (e.g, 'user', 'order', 'product')
the workflow/job id
Optional
namespace: stringthe namespace for the client
Returns all fields in the HASH record:
:
: workflow status (a semaphore where 0
is complete)_*
: function state (name/value pairs are prefixed with _
)-*
: workflow cycle state (cycles are prefixed with -
)[a-zA-Z]{3}
: mutable workflow job state[a-zA-Z]{3}[,\d]+
: immutable workflow activity statethe entity name (e.g, 'user', 'order', 'product')
the workflow/job id
Optional
options: CallOptions = {}call options
Sends a signal to the backend Service Mesh (workers and engines) to announce their presence, including message counts, target functions, topics, etc. This is useful for establishing the network profile and overall message throughput of the operational data layer as a unified quorum.
Sets the remote function state. this is different than the function response returned by the exec method which represents the return value from the function at the moment it completed. Instead, function state represents mutable shared state that can be set
the entity name (e.g, 'user', 'order', 'product')
the job id
Optional
options: CallOptions = {}call options
Signals a Hook Function or Main Function to awaken that is paused and registered to awaken upon receiving the signal matching @guid.
The global identifier for the signal
The payload to send with the signal
Optional
namespace: stringthe namespace for the client
Throttles a worker or engine in the backend Service Mesh, using either
a 'guid' to target a specific worker or engine, or a 'topic' to target
a group of worker(s) connected to that topic. The throttle value is
specified in milliseconds and will cause the target(s) to delay consuming
the next message by this amount. By default, the value is set to 0
.
Static
shutdown
The
MeshData
service wraps theMeshFlow
service. It serves to unify both data record concepts and transactional workflow principles into a single Operational Data Layer. Deployments with a 'search' provider configured (e.g.,Redis FT.SEARCH) can deliver both OLTP (transactions) and OLAP (analytics) with no additional infrastructure.The following example depicts the full end-to-end lifecycle of a
MeshData
app, including the connection of a worker function, the execution of a remote function, the retrieval of data, the creation of a search index, and the execution of a full-text search query.Example