The MeshData service wraps the MeshFlow service. It serves to unify both data record concepts and transactional workflow principles into a single Operational Data Layer. Deployments with the Redis FT.SEARCH module enabled can deliver both OLTP (transactions) and OLAP (analytics) with no additional infrastructure.

The following example depicts the full end-to-end lifecycle of a MeshData app, including the connection of a worker function, the execution of a remote function, the retrieval of data from Redis, the creation of a search index, and the execution of a full-text search query.

import { MeshData, Types } from '@hotmeshio/hotmesh';
import * as Redis from 'redis';

//1) Define a search schema
const schema = {
schema: {
id: { type: 'TAG', sortable: true },
plan: { type: 'TAG', sortable: true },
active: { type: 'TEXT', sortable: false },
},
index: 'user',
prefix: ['user'], //index items with keys starting with 'user'
} as unknown as Types.WorkflowSearchOptions;

//2) Initialize MeshData and Redis
const meshData = new MeshData(
Redis,
{ url: 'redis://:key_admin@redis:6379' },
schema,
);

//3) Connect a 'user' worker function
await meshData.connect({
entity: 'user',
target: async function(userID: string): Promise<string> {
//used the `search` extension to add searchable data
const search = await MeshData.workflow.search();
await search.set('active', 'yes');
return `Welcome, ${userID}.`;
},
options: { namespace: 'meshdata' },
});

const userID = 'someTestUser123';

//4) Call the 'user' worker function; include search data
const response = await meshData.exec({
entity: 'user',
args: [userID],
options: {
ttl: 'infinity',
id: userID,
search: {
data: { id: userID, plan: 'pro' }
},
namespace: 'meshdata',
},
});

//5) Read data (by field name) directly from Redis
const data = await meshData.get(
'user',
userID,
{
fields: ['plan', 'id', 'active'],
namespace: 'meshdata'
},
);

//6) Create a search index
await meshData.createSearchIndex('user', { namespace: 'meshdata' }, schema);

//7) Perform Full Text Search on the indexed dataset
const results = await meshData.findWhere('user', {
query: [{ field: 'id', is: '=', value: userID }],
limit: { start: 0, size: 100 },
return: ['plan', 'id', 'active']
});

//8) Shutdown MeshData
await MeshData.shutdown();

Constructors

  • Parameters

    • redisClass: Partial<RedisClass>

      the Redis class/import (e.g, ioredis, redis)

    • redisOptions: Partial<RedisOptions>

      the Redis connection options. These are specific to the package (refer to their docs!). Each uses different property names and structures.

    • Optionalsearch: WorkflowSearchOptions

      the Redis search options for JSON-based configuration of the Redis FT.Search module index

    Returns MeshData

    // Instantiate MeshData with `ioredis`
    import Redis from 'ioredis';

    const meshData = new MeshData(Redis, {
    host: 'localhost',
    port: 6379,
    password: 'shhh123',
    db: 0,
    });

    // Instantiate MeshData with `redis`
    import * as Redis from 'redis';

    const meshData = new MeshData(Redis, { url: 'redis://:shhh123@localhost:6379' });

Properties

mesh: {
    pub: ((message: QuorumMessage, options?: SubscriptionOptions) => Promise<void>);
    sub: ((callback: QuorumMessageCallback, options?: SubscriptionOptions) => Promise<void>);
    unsub: ((callback: QuorumMessageCallback, options?: SubscriptionOptions) => Promise<void>);
} = ...

Exposes the the service mesh control plane through the mesh 'events' (pub/sub) system. This is useful for monitoring and managing the operational data layer.

Type declaration

Redis FT search configuration (indexed/searchable fields and types)

proxyActivities: Object = MeshFlow.workflow.proxyActivities

Wrap activities in a proxy that will durably run them, once.

workflow: {
    execChild: (<T>(options?: Partial<WorkflowOptions>) => Promise<T>);
    executeChild: (<T>(options?: Partial<WorkflowOptions>) => Promise<T>);
    getContext: (() => WorkflowContext);
    getHotMesh: (() => Promise<HotMesh>);
    hook: ((options: HookOptions) => Promise<string>);
    interrupt: ((entity: string, id: string, options?: JobInterruptOptions) => Promise<void>);
    once: (<T>(fn: ((...args: any[]) => Promise<T>), ...args: any[]) => Promise<T>);
    random: (() => number);
    search: (() => Promise<Search>);
    signal: ((signalId: string, data: Record<any, any>) => Promise<string>);
    sleep: ((duration: string) => Promise<number>);
    sleepFor: ((duration: string) => Promise<number>);
    startChild: ((options?: Partial<WorkflowOptions>) => Promise<string>);
    waitFor: (<T>(signalId: string) => Promise<T>);
    waitForSignal: (<T>(signalId: string) => Promise<T>);
} = ...

Provides a set of static extensions that can be invoked by your linked workflow functions during their execution.

Type declaration

  • execChild: (<T>(options?: Partial<WorkflowOptions>) => Promise<T>)

    Starts a new, subordinated workflow/job execution. NOTE: The child workflow's lifecycle is bound to the parent workflow, and it will be terminated/scrubbed when the parent workflow is terminated/scrubbed.

      • <T>(options?): Promise<T>
      • Type Parameters

        • T

          The expected return type of the target function.

        Parameters

        Returns Promise<T>

  • executeChild: (<T>(options?: Partial<WorkflowOptions>) => Promise<T>)

    Starts a new, subordinated workflow/job execution. NOTE: The child workflow's lifecycle is bound to the parent workflow, and it will be terminated/scrubbed when the parent workflow is terminated/scrubbed.

      • <T>(options?): Promise<T>
      • Type Parameters

        • T

          The expected return type of the target function.

        Parameters

        Returns Promise<T>

  • getContext: (() => WorkflowContext)
  • getHotMesh: (() => Promise<HotMesh>)
      • (): Promise<HotMesh>
      • Return a handle to the hotmesh client hosting the workflow execution

        Returns Promise<HotMesh>

  • hook: ((options: HookOptions) => Promise<string>)
      • (options): Promise<string>
      • Spawns a hook from either the main thread or a hook thread with the provided options; worflowId/TaskQueue/Name are optional and will default to the current workflowId/WorkflowTopic if not provided

        Parameters

        Returns Promise<string>

  • interrupt: ((entity: string, id: string, options?: JobInterruptOptions) => Promise<void>)

    Interrupts a job by its entity and id.

      • (entity, id, options?): Promise<void>
      • Parameters

        Returns Promise<void>

  • once: (<T>(fn: ((...args: any[]) => Promise<T>), ...args: any[]) => Promise<T>)
      • <T>(fn, ...args): Promise<T>
      • Executes a function once and caches the result. If the function is called again, the cached result is returned. This is useful for wrapping expensive activity calls that should only be run once, but which might not require the cost and safety provided by proxyActivities.

        Type Parameters

        • T

          the result type

        Parameters

        • fn: ((...args: any[]) => Promise<T>)
            • (...args): Promise<T>
            • Parameters

              • Rest...args: any[]

              Returns Promise<T>

        • Rest...args: any[]

        Returns Promise<T>

  • random: (() => number)
      • (): number
      • Returns a random number between 0 and 1. This number is deterministic and will never vary for a given seed. This is useful for randomizing pathways in a workflow that can be safely replayed.

        Returns number

        • a random number between 0 and 1
  • search: (() => Promise<Search>)
      • (): Promise<Search>
      • Returns a search session for use when reading/writing to the workflow HASH. The search session provides access to methods like get, mget, set, del, and incr.

        Returns Promise<Search>

        • a search session
  • signal: ((signalId: string, data: Record<any, any>) => Promise<string>)
      • (signalId, data): Promise<string>
      • Sends signal data into any other paused thread (which is currently awaiting the signal)

        Parameters

        • signalId: string

          the signal id

        • data: Record<any, any>

          the signal data

        Returns Promise<string>

        • the stream id
  • sleep: ((duration: string) => Promise<number>)
      • (duration): Promise<number>
      • Sleeps the workflow for a duration. As the function is reentrant, upon reentry, the function will traverse prior execution paths up until the sleep command and then resume execution thereafter.

        Parameters

        • duration: string

          See the ms package for syntax examples: '1 minute', '2 hours', '3 days'

        Returns Promise<number>

        • resolved duration in seconds
  • sleepFor: ((duration: string) => Promise<number>)
      • (duration): Promise<number>
      • Sleeps the workflow for a duration. As the function is reentrant, upon reentry, the function will traverse prior execution paths up until the sleep command and then resume execution thereafter.

        Parameters

        • duration: string

          See the ms package for syntax examples: '1 minute', '2 hours', '3 days'

        Returns Promise<number>

        • resolved duration in seconds
  • startChild: ((options?: Partial<WorkflowOptions>) => Promise<string>)

    Starts a new, subordinated workflow/job execution, awaiting only the jobId, namely, the confirmation that the suboridinated job has begun. NOTE: The child workflow's lifecycle is bound to the parent workflow, and it will be terminated/scrubbed when the parent workflow is terminated/scrubbed.

      • (options?): Promise<string>
      • Parameters

        Returns Promise<string>

  • waitFor: (<T>(signalId: string) => Promise<T>)
      • <T>(signalId): Promise<T>
      • Pauses the workflow until signalId is received.

        Type Parameters

        • T

          the result type

        Parameters

        • signalId: string

          a unique, shareable guid (e.g, 'abc123')

        Returns Promise<T>

        const result = await MeshFlow.workflow.waitFor<typeof resultType>('abc123');
        
  • waitForSignal: (<T>(signalId: string) => Promise<T>)
      • <T>(signalId): Promise<T>
      • Pauses the workflow until signalId is received.

        Type Parameters

        • T

          the result type

        Parameters

        • signalId: string

          a unique, shareable guid (e.g, 'abc123')

        Returns Promise<T>

        const result = await MeshFlow.workflow.waitFor<typeof resultType>('abc123');
        
function greet (email: string, user: { first: string}) {
//persist the user's email and newsletter preferences
const search = await MeshData.workflow.search();
await search.set('email', email, 'newsletter', 'yes');

//hook a function to send a newsletter
await MeshData.workflow.hook({
entity: 'user.newsletter',
args: [email]
});

return `Hello, ${user.first}. Your email is [${email}].`;
}

Methods

  • Returns the remote function state for all fields. NOTE: all can be less efficient than calling get as it returns all fields (HGETALL), not just the ones requested (HMGET). Depending upon the duration of the workflow, this could represent a large amount of process/history data.

    Parameters

    • entity: string

      the entity name (e.g, 'user', 'order', 'product')

    • id: string

      the workflow/job id

    • Optionaloptions: CallOptions = {}

      call options

    Returns Promise<StringAnyType>

    • the function state
    // get the state of the job (this is not the response...this is job state)
    const state = await meshData.all('greeting', 'jsmith123');

    // returns { fred: 'flintstone', barney: 'rubble', ... }
  • Connects a function to the operational data layer.

    Type Parameters

    • T

      The expected return type of the target function.

    Parameters

    Returns Promise<boolean>

    True if connection is successfully established.

    // Instantiate MeshData with Redis configuration.
    const meshData = new MeshData(Redis, { host: 'localhost', port: 6379 });

    // Define and connect a function with the 'greeting' entity.
    // The function will be cached indefinitely (infinite TTL).
    meshData.connect({
    entity: 'greeting',
    target: (email, user) => `Hello, ${user.first}.`,
    options: { ttl: 'infinity' }
    });
  • Creates a search index for the specified entity (FT.search). The index must be removed by calling FT.DROP_INDEX directly in Redis.

    Parameters

    • entity: string

      the entity name (e.g, 'user', 'order', 'product')

    • Optionaloptions: CallOptions = {}

      call options

    • OptionalsearchOptions: WorkflowSearchOptions

      search options

    Returns Promise<void>

    • the search index name
    // create a search index for the 'greeting' entity. pass in search options.
    const index = await meshData.createSearchIndex('greeting', {}, { prefix: 'greeting', ... });

    // creates a search index for the 'greeting' entity, using the default search options.
    const index = await meshData.createSearchIndex('greeting');
  • Deletes one or more fields from the remote function state.

    Parameters

    • entity: string

      the entity name (e.g, 'user', 'order', 'product')

    • id: string

      the job id

    • Optionaloptions: CallOptions

      call options

    Returns Promise<number>

    • the count of fields deleted
    // remove two hash fields from the function state
    const count = await meshData.del('greeting', 'jsmith123', { fields: ['fred', 'barney'] });
  • Executes a remote function by its global entity identifier with specified arguments. If options.ttl is infinity, the function will be cached indefinitely and can only be removed by calling flush. During this time, the function will remain active and its state can be augmented by calling set, incr, del, etc OR by calling a transactional 'hook' function.

    Type Parameters

    • T

      The expected return type of the remote function.

    Parameters

    Returns Promise<T>

    A promise that resolves with the result of the remote function execution. If the input options include await: false, the promise will resolve with the workflow ID (string) instead of the result. Make sure to pass string as the return type if you are using await: false.

    // Invoke a remote function with arguments and options
    const response = await meshData.exec({
    entity: 'greeting',
    args: ['jsmith@hotmesh', { first: 'Jan' }],
    options: { ttl: '15 minutes', id: 'jsmith123' }
    });
  • Exports the job profile for the function execution, including all state, process, and timeline data. The information in the export is sufficient to capture the full state of the function in the moment and over time.

    Parameters

    • entity: string

      the entity name (e.g, 'user', 'order', 'product')

    • id: string

      The workflow/job id

    • Optionaloptions: ExportOptions

      Configuration options for the export

    • Optionalnamespace: string

      the namespace for the client

    Returns Promise<MeshFlowJobExport>

    // Export a function
    await meshData.export('greeting', 'jsmith123');
  • Executes the redis FT search query; optionally specify other commands

    Parameters

    • entity: string
    • options: FindOptions
    • Rest...args: string[]

    Returns Promise<string[] | [number] | (string | number | string[])[]>

    '@_quantity:[89 89]'
    
    '@_quantity:[89 89] @_name:"John"'
    
    'FT.search my-index @_quantity:[89 89]'
    
  • For those Redis implementations without the FT module, this quasi-equivalent method is provided that uses SCAN along with a custom match string to view jobs. A cursor is likewise provided in support of rudimentary pagination.

    Parameters

    Returns Promise<[string, string[]]>

    // find jobs
    const [cursor, jobs] = await meshData.findJobs({ match: 'greeting*' });

    // returns [ '0', [ 'hmsh:meshflow:j:greeting-jsmith123', 'hmsh:meshflow:j:greeting-jdoe456' ] ]
  • Provides a JSON abstraction for the Redis FT.search command (e.g, count, query, return, limit) NOTE: If the type is TAG for an entity, ., @, and - must be escaped.

    Parameters

    • entity: string

      the entity name (e.g, 'user', 'order', 'product')

    • options: FindWhereOptions

      find options (the query). A custom search schema may be provided to target any index on the Redis backend.

    Returns Promise<number | SearchResults>

    Returns a number if count is true, otherwise a SearchResults object.

    const results = await meshData.findWhere('greeting', {
    query: [
    { field: 'name', is: '=', value: 'John' },
    { field: 'age', is: '>', value: 2 },
    { field: 'quantity', is: '[]', value: [89, 89] }
    ],
    count: false,
    limit: { start: 0, size: 10 },
    return: ['name', 'quantity']
    });

    // returns { count: 1, query: 'FT.SEARCH my-index @_name:"John" @_age:[2 +inf] @_quantity:[89 89] LIMIT 0 10', data: [ { name: 'John', quantity: '89' } ] }
  • Flushes a function with a ttl of 'infinity'. These entities were created by a connect method that was configured with a ttl of 'infinity'. It can take several seconds for the function to be removed from the cache as it might be actively orchestrating sub-workflows.

    Parameters

    • entity: string

      the entity name (e.g, 'user', 'order', 'product')

    • id: string

      The workflow/job id

    • Optionalnamespace: string

      the namespace for the client

    Returns Promise<string | void>

    // Flush a function
    await meshData.flush('greeting', 'jsmith123');
  • Returns the remote function state. this is different than the function response returned by the exec method which represents the return value from the function at the moment it completed. Instead, function state represents mutable shared state that can be set:

    1. when the record is first created (provide options.search.data to exec)
    2. during function execution ((await MeshData.workflow.search()).set(...))
    3. during hook execution ((await MeshData.workflow.search()).set(...))
    4. via the meshData SDK (meshData.set(...))

    Parameters

    • entity: string

      the entity name (e.g, 'user', 'order', 'product')

    • id: string

      the job id

    • Optionaloptions: CallOptions = {}

      call options

    Returns Promise<StringAnyType>

    • the function state
    // get the state of a function
    const state = await meshData.get('greeting', 'jsmith123', { fields: ['fred', 'barney'] });

    // returns { fred: 'flintstone', barney: 'rubble' }
  • Similar to exec, except it augments the workflow state without creating a new job.

    Parameters

    • input: HookInput

      The input parameters for hooking a function.

    Returns Promise<string>

    The signal id.

    // Hook a function
    const signalId = await meshData.hook({
    entity: 'greeting',
    id: 'jsmith123',
    hookEntity: 'greeting.newsletter',
    hookArgs: ['xxxx@xxxxx'],
    options: {}
    });
  • Increments a field in the remote function state.

    Parameters

    • entity: string

      the entity name (e.g, 'user', 'order', 'product')

    • id: string

      the job id

    • field: string

      the field name

    • amount: number

      the amount to increment

    • Optionaloptions: CallOptions = {}

      call options

    Returns Promise<number>

    • the new value
    // increment a field in the function state
    const count = await meshData.incr('greeting', 'jsmith123', 'counter', 1);
  • Retrieves the job profile for the function execution, including metadata such as execution status and result.

    Parameters

    • entity: string

      the entity name (e.g, 'user', 'order', 'product')

    • id: string

      identifier for the job

    • Optionaloptions: CallOptions = {}

      Configuration options for the execution, including custom IDs, time-to-live (TTL) settings, etc. Defaults to an empty object if not provided.

    Returns Promise<JobOutput>

    A promise that resolves with the job's output, which includes metadata about the job's execution status. The structure of JobOutput should contain all relevant information such as execution result, status, and any error messages if the job failed.

    // Retrieve information about a remote function's execution by job ID
    const jobInfoById = await meshData.info('greeting', 'job-12345');

    // Response: JobOutput
    {
    metadata: {
    tpc: 'meshflow.execute',
    app: 'meshflow',
    vrs: '1',
    jid: 'greeting-jsmith123',
    aid: 't1',
    ts: '0',
    jc: '20240208014803.980',
    ju: '20240208065017.762',
    js: 0
    },
    data: {
    done: true,
    response: 'Hello, Jan. Your email is [jsmith@hotmesh.com].',
    workflowId: 'greeting-jsmith123'
    }
    }
  • Interrupts a job by its entity and id. It is best not to call this method directly for entries with a ttl of infinity (call flush instead). For those entities that are cached for a specified duration (e.g., '15 minutes'), this method will interrupt the job and start the cascaded cleanup/expire/delete. As jobs are asynchronous, there is no way to stop descendant flows immediately. Use an expire option to keep the interrupted job in the cache for a specified duration before it is fully removed.

    Parameters

    • entity: string

      the entity name (e.g, 'user', 'order', 'product')

    • id: string

      The workflow/job id

    • Optionaloptions: JobInterruptOptions = {}

      call options

    • Optionalnamespace: string

      the namespace for the client

    Returns Promise<void>

    // Interrupt a function
    await meshData.interrupt('greeting', 'jsmith123');
  • Lists all search indexes in the operational data layer when the targeted Redis backend supports the FT module.

    Returns Promise<string[]>

    // list all search indexes
    const indexes = await meshData.listSearchIndexes();

    // returns ['greeting', 'user', 'order', 'product']
  • Returns the Redis HASH key given an entity name and workflow/job. The item identified by this key is a HASH record with multidimensional process data interleaved with the function state data.

    Parameters

    • entity: string

      the entity name (e.g, 'user', 'order', 'product')

    • workflowId: string

      the workflow/job id

    • Optionalnamespace: string

      the namespace for the client

    Returns Promise<string>

    // mint a key
    const key = await meshData.mintKey('greeting', 'jsmith123');

    // returns 'hmsh:meshflow:j:greeting-jsmith123'
  • Returns all fields in the HASH record from Redis (HGETALL). Record fields include the following:

    1. :: workflow status (a semaphore where 0 is complete)
    2. _*: function state (name/value pairs are prefixed with _)
    3. -*: workflow cycle state (cycles are prefixed with -)
    4. [a-zA-Z]{3}: mutable workflow job state
    5. [a-zA-Z]{3}[,\d]+: immutable workflow activity state

    Parameters

    • entity: string

      the entity name (e.g, 'user', 'order', 'product')

    • id: string

      the workflow/job id

    • Optionaloptions: CallOptions = {}

      call options

    Returns Promise<StringAnyType>

    • the function state
    // get the state of a function
    const state = await meshData.raw('greeting', 'jsmith123');

    // returns { : '0', _barney: 'rubble', aBa: 'Hello, John Doe. Your email is [jsmith@hotmesh].', ... }
  • Sends a signal to the backend Service Mesh (workers and engines) to announce their presence, including message counts, target functions, topics, etc. This is useful for establishing the network profile and overall message throughput of the operational data layer as a unified quorum.

    Parameters

    Returns Promise<QuorumProfile[]>

  • Sets the remote function state. this is different than the function response returned by the exec method which represents the return value from the function at the moment it completed. Instead, function state represents mutable shared state that can be set

    Parameters

    • entity: string

      the entity name (e.g, 'user', 'order', 'product')

    • id: string

      the job id

    • Optionaloptions: CallOptions = {}

      call options

    Returns Promise<number>

    • count
    // set the state of a function
    const count = await meshData.set('greeting', 'jsmith123', { search: { data: { fred: 'flintstone', barney: 'rubble' } } });
  • Signals a Hook Function or Main Function to awaken that is paused and registered to awaken upon receiving the signal matching @guid.

    Parameters

    • guid: string

      The global identifier for the signal

    • payload: StringAnyType

      The payload to send with the signal

    • Optionalnamespace: string

      the namespace for the client

    Returns Promise<string>

    • the signal id
    // Signal a function with a payload
    await meshData.signal('signal123', { message: 'hi!' });

    // returns '123456732345-0' (redis stream message receipt)
  • Throttles a worker or engine in the backend Service Mesh, using either a 'guid' to target a specific worker or engine, or a 'topic' to target a group of worker(s) connected to that topic. The throttle value is specified in milliseconds and will cause the target(s) to delay consuming the next message by this amount. By default, the value is set to 0.

    Parameters

    Returns Promise<boolean>

    // Throttle a worker or engine
    await meshData.throttle({ guid: '1234567890', throttle: 10_000 });