Workflow

Last updated: 9 minutes read.

The workflow processor in Tyk Streams efficiently manages and executes a topology of branch processors, enabling parallel execution where possible. This guide provides a comprehensive overview of both common and advanced configuration fields, demonstrating how to set up and optimize your workflows for performance and simplicity. By leveraging directed acyclic graphs (DAGs) and parallel processing, the workflow processor can significantly reduce message processing latency, especially when dealing with high-latency, low-CPU-activity processors. Detailed examples and explanations are provided to help you configure workflows that can automatically resolve dependencies or manually specify execution order, ensuring optimal performance and flexibility for your processing needs.

Common

# Common config fields, showing default values
label: ""
workflow:
  meta_path: meta.workflow
  order: []
  branches: {}

Advanced

# All config fields, showing default values
label: ""
workflow:
  meta_path: meta.workflow
  order: []
  branch_resources: []
  branches: {}

Why Use a Workflow

Performance

Most of the time the best way to compose processors is also the simplest, just configure them in series. This is because processors are often CPU bound, low-latency, and you can gain vertical scaling by increasing the number of processor pipeline threads, allowing Tyk Streams to process multiple messages in parallel.

However, some processors such as aws_lambda or cache interact with external services and therefore spend most of their time waiting for a response. These processors tend to be high-latency and low CPU activity, which causes messages to process slowly.

When a processing pipeline contains multiple network processors that aren’t dependent on each other we can benefit from performing these processors in parallel for each individual message, reducing the overall message processing latency.

Simplifying Processor Topology

A workflow is often expressed as a Directed Acyclic Graph of processing stages, where each stage can result in N possible next stages, until finally the flow ends at an exit node.

For example, if we had processing stages A, B, C and D, where stage A could result in either stage B or C being next, always followed by D, it might look something like this:

     /--> B --\
A --|          |--> D
     \--> C --/

This flow would be easy to express in a standard Tyk Streams config, we could simply use a switch processor to route to either B or C depending on a condition on the result of A. However, this method of flow control quickly becomes unfeasible as the DAG gets more complicated, imagine expressing this flow using switch processors:

      /--> B -------------|--> D
     /                   /
A --|          /--> E --|
     \--> C --|          \
               \----------|--> F

Futhermore, the diagram is subject to change over time. Instead, with a workflow we can either trust it to automatically resolve the DAG or express it manually as simply as order: [ [ A ], [ B, C ], [ E ], [ D, F ] ], and the conditional logic for determining if a stage is executed is defined as part of the branch itself.

Examples

Automatic Ordering

When the field order is omitted a best attempt is made to determine a dependency tree between branches based on their request and result mappings. In the following example the branches foo and bar will be executed first in parallel, and afterwards the branch baz will be executed.

pipeline:
  processors:
    - workflow:
        meta_path: meta.workflow
        branches:
          foo:
            request_map: 'root = ""'
            processors:
              - http:
                  url: TODO
            result_map: 'root.foo = this'

          bar:
            request_map: 'root = this.body'
            processors:
              - aws_lambda:
                  function: TODO
            result_map: 'root.bar = this'

          baz:
            request_map: |
              root.fooid = this.foo.id
              root.barstuff = this.bar.content              
            processors:
              - cache:
                  resource: TODO
                  operator: set
                  key: ${! json("fooid") }
                  value: ${! json("barstuff") }

Conditional Branches

Branches of a workflow are skipped when the request_map assigns deleted() to the root. In this example the branch A is executed when the document type is “foo”, and branch B otherwise. Branch C is executed afterwards and is skipped unless either A or B successfully provided a result at tmp.result.

pipeline:
  processors:
    - workflow:
        branches:
          A:
            request_map: |
              root = if this.document.type != "foo" {
                  deleted()
              }              
            processors:
              - http:
                  url: TODO
            result_map: 'root.tmp.result = this'

          B:
            request_map: |
              root = if this.document.type == "foo" {
                  deleted()
              }              
            processors:
              - aws_lambda:
                  function: TODO
            result_map: 'root.tmp.result = this'

          C:
            request_map: |
              root = if this.tmp.result != null {
                  deleted()
              }              
            processors:
              - http:
                  url: TODO_SOMEWHERE_ELSE
            result_map: 'root.tmp.result = this'

Resources

The order field can be used in order to refer to branch processor resources, this can sometimes make your pipeline configuration cleaner, as well as allowing you to reuse branch configurations in order places. It’s also possible to mix and match branches configured within the workflow and configured as resources.

pipeline:
  processors:
    - workflow:
        order: [ [ foo, bar ], [ baz ] ]
        branches:
          bar:
            request_map: 'root = this.body'
            processors:
              - aws_lambda:
                  function: TODO
            result_map: 'root.bar = this'

processor_resources:
  - label: foo
    branch:
      request_map: 'root = ""'
      processors:
        - http:
            url: TODO
      result_map: 'root.foo = this'

  - label: baz
    branch:
      request_map: |
        root.fooid = this.foo.id
        root.barstuff = this.bar.content        
      processors:
        - cache:
            resource: TODO
            operator: set
            key: ${! json("fooid") }
            value: ${! json("barstuff") }

Fields

meta_path

A dot path indicating where to store and reference structured metadata about the workflow execution.

Type: string
Default: "meta.workflow"

order

An explicit declaration of branch ordered tiers, which describes the order in which parallel tiers of branches should be executed. Branches should be identified by the name as they are configured in the field branches. It’s also possible to specify branch processors configured as a resource.

Type: two-dimensional array
Default: []

# Examples

order:
  - - foo
    - bar
  - - baz

order:
  - - foo
  - - bar
  - - baz

branch_resources

An optional list of branch processor names that are configured as resources. These resources will be included in the workflow with any branches configured inline within the branches field. The order and parallelism in which branches are executed is automatically resolved based on the mappings of each branch. When using resources with an explicit order it is not necessary to list resources in this field.

Type: array
Default: []

branches

An object of named branch processors that make up the workflow. The order and parallelism in which branches are executed can either be made explicit with the field order, or if omitted an attempt is made to automatically resolve an ordering based on the mappings of each branch.

Type: object
Default: {}

branches.name.request_map

A Bloblang mapping that describes how to create a request payload suitable for the child processors of this branch. If left empty then the branch will begin with an exact copy of the origin message (including metadata).

Type: string Default: ""

# Examples

request_map: |-
  root = {
  	"id": this.doc.id,
  	"content": this.doc.body.text
  }  

request_map: |-
  root = if this.type == "foo" {
  	this.foo.request
  } else {
  	deleted()
  }  

branches.name.processors

A list of processors to apply to mapped requests. When processing message batches the resulting batch must match the size and ordering of the input batch, therefore filtering, grouping should not be performed within these processors.

Type: array

branches.name.result_map

A Bloblang mapping that describes how the resulting messages from branched processing should be mapped back into the original payload. If left empty the origin message will remain unchanged (including metadata).

Type: string
Default: ""

# Examples

result_map: |-
  meta foo_code = metadata("code")
  root.foo_result = this  

result_map: |-
  meta = metadata()
  root.bar.body = this.body
  root.bar.id = this.user.id  

result_map: root.raw_result = content().string()

result_map: |-
  root.enrichments.foo = if metadata("request_failed") != null {
    throw(metadata("request_failed"))
  } else {
    this
  }  

result_map: |-
  # Retain only the updated metadata fields which were present in the origin message
  meta = metadata().filter(v -> @.get(v.key) != null)  

Structured Metadata

When the field meta_path is non-empty the workflow processor creates an object describing which workflows were successful, skipped or failed for each message and stores the object within the message at the end.

The object is of the following form:

{
	"succeeded": [ "foo" ],
	"skipped": [ "bar" ],
	"failed": {
		"baz": "the error message from the branch"
	}
}

If a message already has a meta object at the given path when it is processed then the object is used in order to determine which branches have already been performed on the message (or skipped) and can therefore be skipped on this run.

This is a useful pattern when replaying messages that have failed some branches previously. For example, given the above example object the branches foo and bar would automatically be skipped, and baz would be reattempted.

The previous meta object will also be preserved in the field <meta_path>.previous when the new meta object is written, preserving a full record of all workflow executions.

If a field <meta_path>.apply exists in the meta object for a message and is an array then it will be used as an explicit list of stages to apply, all other stages will be skipped.

Resources

It’s common to configure processors (and other components) as resources in order to keep the pipeline configuration cleaner. With the workflow processor you can include branch processors configured as resources within your workflow either by specifying them by name in the field order, if Tyk Streams doesn’t find a branch within the workflow configuration of that name it’ll refer to the resources.

Alternatively, if you do not wish to have an explicit ordering, you can add resource names to the field branch_resources and they will be included in the workflow with automatic DAG resolution along with any branches configured in the branches field.

Resource Error Conditions

There are two error conditions that could potentially occur when resources included in your workflow are mutated, and if you are planning to mutate resources in your workflow it is important that you understand them.

The first error case is that a resource in the workflow is removed and not replaced, when this happens the workflow will still be executed but the individual branch will fail. This should only happen if you explicitly delete a branch resource, as any mutation operation will create the new resource before removing the old one.

The second error case is when automatic Directed Acyclic Graph (DAG) resolution is being used and a resource in the workflow is changed in a way that breaks the DAG (circular dependencies, etc). When this happens it is impossible to execute the workflow and therefore the processor will fail, which is possible to capture and handle using standard error handling patterns.

Error Handling

The recommended approach to handle failures within a workflow is to query against the structured metadata it provides, as it provides granular information about exactly which branches failed and which ones succeeded and therefore aren’t necessary to perform again.

For example, if our meta object is stored at the path meta.workflow and we wanted to check whether a message has failed for any branch we can do that using a Bloblang query like this.meta.workflow.failed.length() | 0 > 0, or to check whether a specific branch failed we can use this.exists("meta.workflow.failed.foo").

However, if structured metadata is disabled by setting the field meta_path to empty then the workflow processor instead adds a general error flag to messages when any executed branch fails. In this case it’s possible to handle failures using standard error handling patterns.