Kafka Datasource

Last updated: 8 minutes read.

The Kafka DataSource is able to subscribe to Kafka topics and query the events with GraphQL.

The Kafka DataSource utilizes consumer groups to subscribe to the given topics, and inherits all behavior of the consumer group concept.

Consumer groups are made up of multiple cooperating consumers, and the membership of these groups can change over time. Users can easily add a new consumer to the group to scale the processing load. A consumer can also go offline either for planned maintenance or due to an unexpected failure. Kafka maintains the membership of each group and redistributes work when necessary.

When multiple consumers are subscribed to a topic and belong to the same consumer group, each consumer in the group will receive messages from a different subset of the partitions in the topic. You should know that if you add more consumers to a single group with a single topic than you have partitions, some consumers will be idle and get no messages.

Basic Configuration

You can find the full documentation for Kafka DataSource configuration here.

broker_addresses In order to work with the Kafka DataSource, you first need a running Kafka cluster. The configuration takes a list of known broker addresses and discovers the rest of the cluster.

{
    "broker_addresses": ["localhost:9092"]
}

topics The Kafka DataSource is able to subscribe to multiple topics at the same time but you should know that the structs of events have to match the same GraphQL schema.

{
    "topics": ["product-updates"]
}

group_id As mentioned earlier, the Kafka DataSource utilizes the consumer group concept to subscribe to topics. We use the group_id field to set the consumer group name.

{
    "group_id": "product-updates-group"
}

Multiple APIs can use the same group_id or you can run multiple subscription queries using the same API. Please keep in mind that the Kafka DataSource inherits all behaviors of the consumer group concept.

client_id Finally, we need the client_id field to complete the configuration. It is a user-provided string that is sent with every request to the brokers for logging, debugging, and auditing purposes.

{
    "client_id": "tyk-kafka-integration"
}

Here is the final configuration for the Kafka DataSource:

{
    "broker_addresses": ["localhost:9092"],
    "topics": ["product-updates"],
    "group_id": "product-updates-group",
    "client_id": "tyk-kafka-integration"
}

The above configuration object is just a part of the API Definition Object of Tyk Gateway.

Kafka Datasource configuration via Dashboard

  1. Click on the field which should have Kafka datasource attached

  2. From the right-hand side Configure data source panel choose KAFKA at the bottom in the Add a new external data source section

Kafkaconfig

  1. Provide datasource name, broker address (at least 1), topics (at least 1), groupID, clientID. Optionally you can also choose Kafka version, balance strategy and field mapping options.

  2. Click SAVE button to persist the configuration.

Once done the field you just configured will show information about data source type and name:

KafkaList

Subscribing to topics

The Subscription type always defines the top-level fields that consumers can subscribe to. Let’s consider the following definition:

type Product {
  name: String
  price: Int
  inStock: Int
}

type Subscription {
  productUpdated: Product
}

The productUpdated field will be updated each time a product is updated. Updating a product means a price or inStock fields of Product are updated and an event is published to a Kafka topic. Consumers can subscribe to the productUpdated field by sending the following query to the server:

subscription Products {
    productUpdated {
        name
        price
        inStock
    }
}

You can use any GraphQL client that supports subscriptions.

Publishing events for testing

In order to test the Kafka DataSource, you can publish the following event to product-updates topic:

{
    "productUpdated": {
        "name": "product1",
        "price": 1624,
        "inStock": 219
    }
}

You can use any Kafka client or GUI to publish events to product-updates.

When you change any of the fields, all subscribers of the productUpdatedkafk field are going to receive the new product info.

The result should be similar to the following:

API Menu

API Definition for the Kafka DataSource

The Kafka DataSource configuration:

{
	"kind": "Kafka",
	"name": "kafka-consumer-group",
	"internal": false,
	"root_fields": [{
		"type": "Subscription",
		"fields": [
			"productUpdated"
		]
	}],
	"config": {
		"broker_addresses": [
			"localhost:9092"
		],
		"topics": [
			"product-updates"
		],
		"group_id": "product-updates-group",
		"client_id": "tyk-kafka-integration"
	}
}

Here is a sample API definition for the Kafka DataSource.

{
	"created_at": "2022-09-15T16:19:07+03:00",
	"api_model": {},
	"api_definition": {
		"api_id": "7ec1a1c117f641847c5adddfdcd4630f",
		"jwt_issued_at_validation_skew": 0,
		"upstream_certificates": {},
		"use_keyless": true,
		"enable_coprocess_auth": false,
		"base_identity_provided_by": "",
		"custom_middleware": {
			"pre": [],
			"post": [],
			"post_key_auth": [],
			"auth_check": {
				"name": "",
				"path": "",
				"require_session": false,
				"raw_body_only": false
			},
			"response": [],
			"driver": "",
			"id_extractor": {
				"extract_from": "",
				"extract_with": "",
				"extractor_config": {}
			}
		},
		"disable_quota": false,
		"custom_middleware_bundle": "",
		"cache_options": {
			"cache_timeout": 60,
			"enable_cache": true,
			"cache_all_safe_requests": false,
			"cache_response_codes": [],
			"enable_upstream_cache_control": false,
			"cache_control_ttl_header": "",
			"cache_by_headers": []
		},
		"enable_ip_blacklisting": false,
		"tag_headers": [],
		"jwt_scope_to_policy_mapping": {},
		"pinned_public_keys": {},
		"expire_analytics_after": 0,
		"domain": "",
		"openid_options": {
			"providers": [],
			"segregate_by_client": false
		},
		"jwt_policy_field_name": "",
		"enable_proxy_protocol": false,
		"jwt_default_policies": [],
		"active": true,
		"jwt_expires_at_validation_skew": 0,
		"config_data": {},
		"notifications": {
			"shared_secret": "",
			"oauth_on_keychange_url": ""
		},
		"jwt_client_base_field": "",
		"auth": {
			"disable_header": false,
			"auth_header_name": "Authorization",
			"cookie_name": "",
			"name": "",
			"validate_signature": false,
			"use_param": false,
			"signature": {
				"algorithm": "",
				"header": "",
				"use_param": false,
				"param_name": "",
				"secret": "",
				"allowed_clock_skew": 0,
				"error_code": 0,
				"error_message": ""
			},
			"use_cookie": false,
			"param_name": "",
			"use_certificate": false
		},
		"check_host_against_uptime_tests": false,
		"auth_provider": {
			"name": "",
			"storage_engine": "",
			"meta": {}
		},
		"blacklisted_ips": [],
		"graphql": {
			"schema": "type Product {\n  name: String\n  price: Int\n  inStock: Int\n}\n\ntype Query {\n    topProducts(first: Int): [Product]\n}\n\ntype Subscription {\n  productUpdated: Product\n}",
			"enabled": true,
			"engine": {
				"field_configs": [{
						"type_name": "Query",
						"field_name": "topProducts",
						"disable_default_mapping": false,
						"path": [
							"topProducts"
						]
					},
					{
						"type_name": "Subscription",
						"field_name": "productUpdated",
						"disable_default_mapping": false,
						"path": [
							"productUpdated"
						]
					}
				],
				"data_sources": [{
						"kind": "GraphQL",
						"name": "topProducts",
						"internal": false,
						"root_fields": [{
							"type": "Query",
							"fields": [
								"topProducts"
							]
						}],
						"config": {
							"url": "http://localhost:4002/query",
							"method": "POST",
							"headers": {},
							"default_type_name": "Product"
						}
					},
					{
						"kind": "Kafka",
						"name": "kafka-consumer-group",
						"internal": false,
						"root_fields": [{
							"type": "Subscription",
							"fields": [
								"productUpdated"
							]
						}],
						"config": {
							"broker_addresses": [
								"localhost:9092"
							],
							"topics": [
								"product-updates"
							],
							"group_id": "product-updates-group",
							"client_id": "tyk-kafka-integration"
						}
					}
				]
			},
			"type_field_configurations": [],
			"execution_mode": "executionEngine",
			"proxy": {
				"auth_headers": {
					"Authorization": "Bearer eyJvcmciOiI2MWI5YmZmZTY4OGJmZWNmZjAyNGU5MzEiLCJpZCI6IjE1ZmNhOTU5YmU0YjRmMDFhYTRlODllNWE5MjczZWZkIiwiaCI6Im11cm11cjY0In0="
				}
			},
			"subgraph": {
				"sdl": ""
			},
			"supergraph": {
				"subgraphs": [],
				"merged_sdl": "",
				"global_headers": {},
				"disable_query_batching": false
			},
			"version": "2",
			"playground": {
				"enabled": false,
				"path": "/playground"
			},
			"last_schema_update": "2022-09-15T16:45:42.062+03:00"
		},
		"hmac_allowed_clock_skew": -1,
		"dont_set_quota_on_create": false,
		"uptime_tests": {
			"check_list": [],
			"config": {
				"expire_utime_after": 0,
				"service_discovery": {
					"use_discovery_service": false,
					"query_endpoint": "",
					"use_nested_query": false,
					"parent_data_path": "",
					"data_path": "",
					"cache_timeout": 60
				},
				"recheck_wait": 0
			}
		},
		"enable_jwt": false,
		"do_not_track": false,
		"name": "Kafka DataSource",
		"slug": "kafka-datasource",
		"analytics_plugin": {},
		"oauth_meta": {
			"allowed_access_types": [],
			"allowed_authorize_types": [],
			"auth_login_redirect": ""
		},
		"CORS": {
			"enable": false,
			"max_age": 24,
			"allow_credentials": false,
			"exposed_headers": [],
			"allowed_headers": [
				"Origin",
				"Accept",
				"Content-Type",
				"X-Requested-With",
				"Authorization"
			],
			"options_passthrough": false,
			"debug": false,
			"allowed_origins": [
				"*"
			],
			"allowed_methods": [
				"GET",
				"POST",
				"HEAD"
			]
		},
		"event_handlers": {
			"events": {}
		},
		"proxy": {
			"target_url": "",
			"service_discovery": {
				"endpoint_returns_list": false,
				"cache_timeout": 0,
				"parent_data_path": "",
				"query_endpoint": "",
				"use_discovery_service": false,
				"_sd_show_port_path": false,
				"target_path": "",
				"use_target_list": false,
				"use_nested_query": false,
				"data_path": "",
				"port_data_path": ""
			},
			"check_host_against_uptime_tests": false,
			"transport": {
				"ssl_insecure_skip_verify": false,
				"ssl_min_version": 0,
				"proxy_url": "",
				"ssl_ciphers": []
			},
			"target_list": [],
			"preserve_host_header": false,
			"strip_listen_path": true,
			"enable_load_balancing": false,
			"listen_path": "/kafka-datasource/",
			"disable_strip_slash": true
		},
		"client_certificates": [],
		"use_basic_auth": false,
		"version_data": {
			"not_versioned": true,
			"default_version": "",
			"versions": {
				"Default": {
					"name": "Default",
					"expires": "",
					"paths": {
						"ignored": [],
						"white_list": [],
						"black_list": []
					},
					"use_extended_paths": true,
					"extended_paths": {
						"ignored": [],
						"white_list": [],
						"black_list": [],
						"transform": [],
						"transform_response": [],
						"transform_jq": [],
						"transform_jq_response": [],
						"transform_headers": [],
						"transform_response_headers": [],
						"hard_timeouts": [],
						"circuit_breakers": [],
						"url_rewrites": [],
						"virtual": [],
						"size_limits": [],
						"method_transforms": [],
						"track_endpoints": [],
						"do_not_track_endpoints": [],
						"validate_json": [],
						"internal": []
					},
					"global_headers": {},
					"global_headers_remove": [],
					"global_response_headers": {},
					"global_response_headers_remove": [],
					"ignore_endpoint_case": false,
					"global_size_limit": 0,
					"override_target": ""
				}
			}
		},
		"jwt_scope_claim_name": "",
		"use_standard_auth": false,
		"session_lifetime": 0,
		"hmac_allowed_algorithms": [],
		"disable_rate_limit": false,
		"definition": {
			"enabled": false,
			"name": "",
			"default": "",
			"location": "header",
			"key": "x-api-version",
			"strip_path": false,
			"strip_versioning_data": false,
			"versions": {}
		},
		"use_oauth2": false,
		"jwt_source": "",
		"jwt_signing_method": "",
		"jwt_not_before_validation_skew": 0,
		"use_go_plugin_auth": false,
		"jwt_identity_base_field": "",
		"allowed_ips": [],
		"request_signing": {
			"is_enabled": false,
			"secret": "",
			"key_id": "",
			"algorithm": "",
			"header_list": [],
			"certificate_id": "",
			"signature_header": ""
		},
		"org_id": "630899e6688bfe5fd6bbe679",
		"enable_ip_whitelisting": false,
		"global_rate_limit": {
			"rate": 0,
			"per": 0
		},
		"protocol": "",
		"enable_context_vars": false,
		"tags": [],
		"basic_auth": {
			"disable_caching": false,
			"cache_ttl": 0,
			"extract_from_body": false,
			"body_user_regexp": "",
			"body_password_regexp": ""
		},
		"listen_port": 0,
		"session_provider": {
			"name": "",
			"storage_engine": "",
			"meta": {}
		},
		"auth_configs": {
			"authToken": {
				"disable_header": false,
				"auth_header_name": "Authorization",
				"cookie_name": "",
				"name": "",
				"validate_signature": false,
				"use_param": false,
				"signature": {
					"algorithm": "",
					"header": "",
					"use_param": false,
					"param_name": "",
					"secret": "",
					"allowed_clock_skew": 0,
					"error_code": 0,
					"error_message": ""
				},
				"use_cookie": false,
				"param_name": "",
				"use_certificate": false
			},
			"basic": {
				"disable_header": false,
				"auth_header_name": "Authorization",
				"cookie_name": "",
				"name": "",
				"validate_signature": false,
				"use_param": false,
				"signature": {
					"algorithm": "",
					"header": "",
					"use_param": false,
					"param_name": "",
					"secret": "",
					"allowed_clock_skew": 0,
					"error_code": 0,
					"error_message": ""
				},
				"use_cookie": false,
				"param_name": "",
				"use_certificate": false
			},
			"coprocess": {
				"disable_header": false,
				"auth_header_name": "Authorization",
				"cookie_name": "",
				"name": "",
				"validate_signature": false,
				"use_param": false,
				"signature": {
					"algorithm": "",
					"header": "",
					"use_param": false,
					"param_name": "",
					"secret": "",
					"allowed_clock_skew": 0,
					"error_code": 0,
					"error_message": ""
				},
				"use_cookie": false,
				"param_name": "",
				"use_certificate": false
			},
			"hmac": {
				"disable_header": false,
				"auth_header_name": "Authorization",
				"cookie_name": "",
				"name": "",
				"validate_signature": false,
				"use_param": false,
				"signature": {
					"algorithm": "",
					"header": "",
					"use_param": false,
					"param_name": "",
					"secret": "",
					"allowed_clock_skew": 0,
					"error_code": 0,
					"error_message": ""
				},
				"use_cookie": false,
				"param_name": "",
				"use_certificate": false
			},
			"jwt": {
				"disable_header": false,
				"auth_header_name": "Authorization",
				"cookie_name": "",
				"name": "",
				"validate_signature": false,
				"use_param": false,
				"signature": {
					"algorithm": "",
					"header": "",
					"use_param": false,
					"param_name": "",
					"secret": "",
					"allowed_clock_skew": 0,
					"error_code": 0,
					"error_message": ""
				},
				"use_cookie": false,
				"param_name": "",
				"use_certificate": false
			},
			"oauth": {
				"disable_header": false,
				"auth_header_name": "Authorization",
				"cookie_name": "",
				"name": "",
				"validate_signature": false,
				"use_param": false,
				"signature": {
					"algorithm": "",
					"header": "",
					"use_param": false,
					"param_name": "",
					"secret": "",
					"allowed_clock_skew": 0,
					"error_code": 0,
					"error_message": ""
				},
				"use_cookie": false,
				"param_name": "",
				"use_certificate": false
			},
			"oidc": {
				"disable_header": false,
				"auth_header_name": "Authorization",
				"cookie_name": "",
				"name": "",
				"validate_signature": false,
				"use_param": false,
				"signature": {
					"algorithm": "",
					"header": "",
					"use_param": false,
					"param_name": "",
					"secret": "",
					"allowed_clock_skew": 0,
					"error_code": 0,
					"error_message": ""
				},
				"use_cookie": false,
				"param_name": "",
				"use_certificate": false
			}
		},
		"strip_auth_data": false,
		"id": "6323264b688bfe40b7d71ab3",
		"certificates": [],
		"enable_signature_checking": false,
		"use_openid": false,
		"internal": false,
		"jwt_skip_kid": false,
		"enable_batch_request_support": false,
		"enable_detailed_recording": false,
		"scopes": {
			"jwt": {},
			"oidc": {}
		},
		"response_processors": [],
		"use_mutual_tls_auth": false
	},
	"hook_references": [],
	"is_site": false,
	"sort_by": 0,
	"user_group_owners": [],
	"user_owners": []
}