Cached

Last updated: 3 minutes read.

This component is experimental and therefore subject to change or removal outside of major version releases.

Cache the result of applying one or more processors to messages identified by a key. If the key already exists within the cache the contents of the message will be replaced with the cached result instead of applying the processors. This component is therefore useful in situations where an expensive set of processors need only be executed periodically.

# Config fields, showing default values
label: ""
cached:
  cache: "" # No default (required)
  skip_on: errored() # No default (optional)
  key: my_foo_result # No default (required)
  ttl: "" # No default (optional)
  processors: [] # No default (required)

The format of the data when stored within the cache is a custom and versioned schema chosen to balance performance and storage space. It is therefore not possible to point this processor to a cache that is pre-populated with data that this processor has not created itself.

Examples

Cached Enrichment

In the following example we want to we enrich messages consumed from Kafka with data specific to the origin topic partition, we do this by placing an http processor within a branch, where the HTTP URL contains interpolation functions with the topic and partition in the path.

However, it would be inefficient to make this HTTP request for every single message as the result is consistent for all data of a given topic partition. We can solve this by placing our enrichment call within a cached processor where the key contains the topic and partition, resulting in messages that originate from the same topic/partition combination using the cached result of the prior.

pipeline:
  processors:
    - branch:
        processors:
          - cached:
              key: '${! meta("kafka_topic") }-${! meta("kafka_partition") }'
              cache: foo_cache
              processors:
                - mapping: 'root = ""'
                - http:
                    url: http://example.com/enrichment/${! meta("kafka_topic") }/${! meta("kafka_partition") }
                    verb: GET
        result_map: 'root.enrichment = this'

cache_resources:
  - label: foo_cache
    memory:
      # Disable compaction so that cached items never expire
      compaction_interval: ""

Periodic Global Enrichment

In the following example we enrich all messages with the same data obtained from a static URL with an http processor within a branch. However, we expect the data from this URL to change roughly every 10 minutes, so we configure a cached processor with a static key (since this request is consistent for all messages) and a TTL of 10m.

pipeline:
  processors:
    - branch:
        request_map: 'root = ""'
        processors:
          - cached:
              key: static_foo
              cache: foo_cache
              ttl: 10m
              processors:
                - http:
                    url: http://example.com/get/foo.json
                    verb: GET
        result_map: 'root.foo = this'

cache_resources:
  - label: foo_cache
    memory: {}

Fields

cache

The cache resource to read and write processor results from.

Type: string

skip_on

A condition that can be used to skip caching the results from the processors.

Type: string

# Examples

skip_on: errored()

key

A key to be resolved for each message, if the key already exists in the cache then the cached result is used, otherwise the processors are applied and the result is cached under this key. The key could be static and therefore apply generally to all messages or it could be an interpolated expression that is potentially unique for each message. This field supports interpolation functions.

Type: string

# Examples

key: my_foo_result

key: ${! this.document.id }

key: ${! meta("kafka_key") }

key: ${! meta("kafka_topic") }

ttl

An optional expiry period to set for each cache entry. Some caches only have a general TTL and will therefore ignore this setting. This field supports interpolation functions.

Type: string

processors

The list of processors whose result will be cached.

Type: array