From 840f88a5a824b191a77ff6a15a856a236d3ad3eb Mon Sep 17 00:00:00 2001 From: busma13 Date: Fri, 22 Nov 2024 16:33:34 -0700 Subject: [PATCH 1/5] add cooperative-sticky option to kafka_reader --- asset/src/kafka_reader/interfaces.ts | 2 +- asset/src/kafka_reader_api/schema.ts | 2 +- docs/apis/kafka_reader_api.md | 2 +- docs/operations/kafka_reader.md | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/asset/src/kafka_reader/interfaces.ts b/asset/src/kafka_reader/interfaces.ts index e043318c..0806617b 100644 --- a/asset/src/kafka_reader/interfaces.ts +++ b/asset/src/kafka_reader/interfaces.ts @@ -59,7 +59,7 @@ export interface KafkaReaderConfig extends OpConfig { * Name of partition assignment strategy to use when elected group leader * assigns partitions to group members. */ - partition_assignment_strategy?: 'range' | 'roundrobin'; + partition_assignment_strategy?: 'range' | 'roundrobin' | 'cooperative-sticky'; /** * Name of kafka api used for reader, if none is provided, then one is made * and the name is kafka_reader_api, and is injected into the execution diff --git a/asset/src/kafka_reader_api/schema.ts b/asset/src/kafka_reader_api/schema.ts index 644523a0..b8c9a076 100644 --- a/asset/src/kafka_reader_api/schema.ts +++ b/asset/src/kafka_reader_api/schema.ts @@ -70,7 +70,7 @@ export const schema = { partition_assignment_strategy: { doc: 'Name of partition assignment strategy to use when elected group leader assigns partitions to group members.', default: '', - format: ['range', 'roundrobin', ''] + format: ['range', 'roundrobin', 'cooperative-sticky', ''] } }; diff --git a/docs/apis/kafka_reader_api.md b/docs/apis/kafka_reader_api.md index 06e8b14b..9b10810f 100644 --- a/docs/apis/kafka_reader_api.md +++ b/docs/apis/kafka_reader_api.md @@ -195,7 +195,7 @@ const results = await api.consume(query) | connection | Name of the kafka connection to use when sending data | String | optional, defaults to the 'default' connection in the kafka terafoundation connector config | | max_poll_interval | The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member| String/Duration | optional, defaults to `"5 minutes"` | | offset_reset | How offset resets should be handled when there are no valid offsets for the consumer group. May be set to `smallest`, `earliest`, `beginning`, `largest`, `latest` or `error` | String | optional, defaults to `smallest` | -| partition_assignment_strategy | Name of partition assignment strategy to use when elected group leader assigns partitions to group members. May be set to `range`, `roundrobin` or `""` | String | optional, defaults to `""` | +| partition_assignment_strategy | Name of partition assignment strategy to use when elected group leader assigns partitions to group members. May be set to `range`, `roundrobin`, `cooperative-sticky` or `""` | String | optional, defaults to `""` | | rollback_on_failure | Controls whether the consumer state is rolled back on failure. This will protect against data loss, however this can have an unintended side effect of blocking the job from moving if failures are minor and persistent. NOTE: This currently defaults to false due to the side effects of the behavior, at some point in the future it is expected this will default to true.| Boolean | optional, defaults to `false` | | use_commit_sync | Use commit sync instead of async (usually not recommended) | Boolean | optional, defaults to `false` | | wait |How long to wait for a full chunk of data to be available. Specified in milliseconds if you use a number. | String/Duration/Number | optional, defaults to `30 seconds` | diff --git a/docs/operations/kafka_reader.md b/docs/operations/kafka_reader.md index 805f1111..e325b317 100644 --- a/docs/operations/kafka_reader.md +++ b/docs/operations/kafka_reader.md @@ -75,7 +75,7 @@ results.length === 5000; | connection | Name of the kafka connection to use when sending data | String | optional, defaults to the 'default' connection in the kafka terafoundation connector config | | max_poll_interval | The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member| String/Duration | optional, defaults to `"5 minutes"` | | offset_reset | How offset resets should be handled when there are no valid offsets for the consumer group. May be set to `smallest`, `earliest`, `beginning`, `largest`, `latest` or `error` | String | optional, defaults to `smallest` | -| partition_assignment_strategy | Name of partition assignment strategy to use when elected group leader assigns partitions to group members. May be set to `range`, `roundrobin` or `""` | String | optional, defaults to `""` | +| partition_assignment_strategy | Name of partition assignment strategy to use when elected group leader assigns partitions to group members. May be set to `range`, `roundrobin`, `cooperative-sticky` or `""` | String | optional, defaults to `""` | | rollback_on_failure | Controls whether the consumer state is rolled back on failure. This will protect against data loss, however this can have an unintended side effect of blocking the job from moving if failures are minor and persistent. NOTE: This currently defaults to false due to the side effects of the behavior, at some point in the future it is expected this will default to true.| Boolean | optional, defaults to `false` | | use_commit_sync | Use commit sync instead of async (usually not recommended) | Boolean | optional, defaults to `false` | | wait | How long to wait for a full chunk of data to be available. Specified in milliseconds if you use a number. | String/Duration/Number | optional, defaults to `30 seconds` | From f387d4c0b2639289864c6ec510ec55b4fca42ebf Mon Sep 17 00:00:00 2001 From: busma13 Date: Fri, 22 Nov 2024 16:35:24 -0700 Subject: [PATCH 2/5] remove partition_assignment_strategy option from kafka_dead_letter --- asset/src/kafka_dead_letter/api.ts | 11 ++--------- asset/src/kafka_dead_letter/interfaces.ts | 5 ----- asset/src/kafka_dead_letter/schema.ts | 5 ----- docs/apis/kafka_dead_letter.md | 1 - 4 files changed, 2 insertions(+), 20 deletions(-) diff --git a/asset/src/kafka_dead_letter/api.ts b/asset/src/kafka_dead_letter/api.ts index 03a220e7..fbc32908 100644 --- a/asset/src/kafka_dead_letter/api.ts +++ b/asset/src/kafka_dead_letter/api.ts @@ -78,7 +78,7 @@ export default class KafkaDeadLetter extends OperationAPI } private clientConfig() { - const config = { + return { type: 'kafka', endpoint: this.apiConfig.connection, options: { @@ -93,14 +93,7 @@ export default class KafkaDeadLetter extends OperationAPI 'log.connection.close': false } as Record, autoconnect: false - }; - - const assignmentStrategy = this.apiConfig.partition_assignment_strategy; - if (assignmentStrategy) { - config.rdkafka_options['partition.assignment.strategy'] = assignmentStrategy; - } - - return config as ConnectionConfig; + } as ConnectionConfig; } private async createClient(): Promise { diff --git a/asset/src/kafka_dead_letter/interfaces.ts b/asset/src/kafka_dead_letter/interfaces.ts index f79f137c..700869a8 100644 --- a/asset/src/kafka_dead_letter/interfaces.ts +++ b/asset/src/kafka_dead_letter/interfaces.ts @@ -34,9 +34,4 @@ export interface KafkaDeadLetterConfig extends APIConfig { Set to -1 to disable polling. */ metadata_refresh: number; - /** - * Name of partition assignment strategy to use - * when elected group leader assigns partitions to group members. - */ - partition_assignment_strategy?: 'range' | 'roundrobin'; } diff --git a/asset/src/kafka_dead_letter/schema.ts b/asset/src/kafka_dead_letter/schema.ts index 68457fce..e3325d09 100644 --- a/asset/src/kafka_dead_letter/schema.ts +++ b/asset/src/kafka_dead_letter/schema.ts @@ -43,11 +43,6 @@ export default class Schema extends ConvictSchema { doc: 'How often the producer will poll the broker for metadata information. Set to -1 to disable polling.', default: 300000, format: Number - }, - partition_assignment_strategy: { - doc: 'Name of partition assignment strategy to use when elected group leader assigns partitions to group members.', - default: '', - format: ['range', 'roundrobin', ''] } }; } diff --git a/docs/apis/kafka_dead_letter.md b/docs/apis/kafka_dead_letter.md index ff731e2b..f5084867 100644 --- a/docs/apis/kafka_dead_letter.md +++ b/docs/apis/kafka_dead_letter.md @@ -96,5 +96,4 @@ are sent to topic "failed_record_topic" at the end of the slice | wait | How long to wait for size messages to become available on the producer, in milliseconds. | String/Duration/Number | optional, defaults to `500` | | connection | Name of the kafka connection to use when sending data | String | optional, defaults to the 'default' connection in the kafka terafoundation connector config | | metadata_refresh | How often the producer will poll the broker for metadata information. Set to -1 to disable polling. | String/Duration/Number | optional, defaults to `"5 minutes"` | -| partition_assignment_strategy | Name of partition assignment strategy to use when elected group leader assigns partitions to group members. May be set to `range`, `roundrobin` or `""` | String | optional, defaults to `""` | | _encoding | Used for specifying the data encoding type when using DataEntity.fromBuffer. May be set to `json` or `raw` | String | optional, defaults to `json` | From 106a42bbd1e06b0e7a310ca01f47b15b90c32ac2 Mon Sep 17 00:00:00 2001 From: busma13 Date: Wed, 27 Nov 2024 13:38:47 -0700 Subject: [PATCH 3/5] remove partition_assignment_strategy option from kafka_sender and kafka_sender_api --- asset/src/kafka_sender/interfaces.ts | 5 ----- asset/src/kafka_sender_api/api.ts | 12 ++---------- asset/src/kafka_sender_api/schema.ts | 5 ----- docs/apis/kafka_sender_api.md | 1 - docs/operations/kafka_sender.md | 16 +++++++++++----- 5 files changed, 13 insertions(+), 26 deletions(-) diff --git a/asset/src/kafka_sender/interfaces.ts b/asset/src/kafka_sender/interfaces.ts index 71ec3f2e..429d36f4 100644 --- a/asset/src/kafka_sender/interfaces.ts +++ b/asset/src/kafka_sender/interfaces.ts @@ -38,11 +38,6 @@ export interface KafkaSenderConfig extends OpConfig { Set to -1 to disable polling. */ metadata_refresh: number; - /** - * Name of partition assignment strategy to use when elected - * group leader assigns partitions to group members. - */ - partition_assignment_strategy?: 'range' | 'roundrobin'; /** * This field indicates the number of acknowledgements the leader broker * must receive from ISR brokers before responding to the request: diff --git a/asset/src/kafka_sender_api/api.ts b/asset/src/kafka_sender_api/api.ts index 68ffbb6f..3f4cc83b 100644 --- a/asset/src/kafka_sender_api/api.ts +++ b/asset/src/kafka_sender_api/api.ts @@ -22,7 +22,6 @@ export default class KafkaSenderApi extends APIFactory, autoconnect: false - }; - - const assignmentStrategy = kafkaConfig.partition_assignment_strategy; - if (assignmentStrategy) { - config.rdkafka_options['partition.assignment.strategy'] = assignmentStrategy; - } - - return config as ConnectionConfig; + } as ConnectionConfig; } async create( diff --git a/asset/src/kafka_sender_api/schema.ts b/asset/src/kafka_sender_api/schema.ts index e15e6cdc..f97cd1ef 100644 --- a/asset/src/kafka_sender_api/schema.ts +++ b/asset/src/kafka_sender_api/schema.ts @@ -81,11 +81,6 @@ export const schema = { default: '5 minutes', format: 'duration' }, - partition_assignment_strategy: { - doc: 'Name of partition assignment strategy to use when elected group leader assigns partitions to group members.', - default: '', - format: ['range', 'roundrobin', ''] - }, required_acks: { doc: 'The number of required broker acknowledgements for a given request, set to -1 for all.', default: 1, diff --git a/docs/apis/kafka_sender_api.md b/docs/apis/kafka_sender_api.md index c6809383..f3624921 100644 --- a/docs/apis/kafka_sender_api.md +++ b/docs/apis/kafka_sender_api.md @@ -205,5 +205,4 @@ await api.send([ | connection | Name of the kafka connection to use when sending data | String | optional, defaults to the 'default' connection in the kafka terafoundation connector config | | required_acks | The number of required broker acknowledgements for a given request, set to -1 for all. | Number | optional, defaults to `1` | | metadata_refresh | How often the producer will poll the broker for metadata information. Set to -1 to disable polling. | String/Duration/Number | optional, defaults to `"5 minutes"` | -| partition_assignment_strategy | Name of partition assignment strategy to use when elected group leader assigns partitions to group members. May be set to `range`, `roundrobin` or `""` | String | optional, defaults to `""` | | api_name | Name of `kafka_sender_api` used for the sender, if none is provided, then one is made and assigned the name to `kafka_sender_api`, and is injected into the execution | String | optional, defaults to `kafka_sender_api`|| _encoding | Used for specifying the data encoding type when using DataEntity.fromBuffer. May be set to `json` or `raw` | String | optional, defaults to `json` | diff --git a/docs/operations/kafka_sender.md b/docs/operations/kafka_sender.md index edfc1936..baf13276 100644 --- a/docs/operations/kafka_sender.md +++ b/docs/operations/kafka_sender.md @@ -1,4 +1,5 @@ # kafka_sender + The kafka_sender is used to send data to a kafka topic. This is a high throughput operation. This uses [node-rdkafka](https://github.com/Blizzard/node-rdkafka) underneath the hood. @@ -8,9 +9,11 @@ For this sender to function properly, you will need a running kafka cluster and ## Usage ### Send data to topic, use key and time from fields on record + In this example, the kafka_sender will send data to the kafka-test-sender topic using the uuid field of the record. It will also annotate the kafka record timestamp metadata with the date specified on the created field on the record. Example job + ```json { "name": "test-job", @@ -64,9 +67,11 @@ results === data; ``` ### Send data to topic, use _key metadata and create its own timestamp -In this example, the kafka_sender will send data to the kafka-test-sender topic using the _key metadata value, which happens when the `id_field` is not set. It will also annotate the kafka record timestamp metadata with a new date at processing time. + +In this example, the kafka_sender will send data to the kafka-test-sender topic using the_key metadata value, which happens when the `id_field` is not set. It will also annotate the kafka record timestamp metadata with a new date at processing time. Example job + ```json { "name": "test-job", @@ -133,19 +138,20 @@ results === data; | connection | Name of the kafka connection to use when sending data | String | optional, defaults to the 'default' connection in the kafka terafoundation connector config | | required_acks | The number of required broker acknowledgements for a given request, set to -1 for all. | Number | optional, defaults to `1` | | metadata_refresh | How often the producer will poll the broker for metadata information. Set to -1 to disable polling. | String/Duration/Number | optional, defaults to `"5 minutes"` | -| partition_assignment_strategy | Name of partition assignment strategy to use when elected group leader assigns partitions to group members. May be set to `range`, `roundrobin` or `""` | String | optional, defaults to `""` | -| api_name | Name of `kafka_sender_api` used for the sender, if none is provided, then one is made and assigned the name to `kafka_sender_api`, and is injected into the execution | String | optional, defaults to `kafka_sender_api`|| _encoding | Used for specifying the data encoding type when using DataEntity.fromBuffer. May be set to `json` or `raw` | String | optional, defaults to `json` | +| api_name | Name of `kafka_sender_api` used for the sender, if none is provided, then one is made and assigned the name to `kafka_sender_api`, and is injected into the execution | String | optional, defaults to `kafka_sender_api`| +| _encoding | Used for specifying the data encoding type when using DataEntity.fromBuffer. May be set to `json` or `raw` | String | optional, defaults to `json` | | _dead_letter_action | action will specify what to do when failing to parse or transform a record. It may be set to `throw`, `log` or `none`. If none of the actions are specified it will try and use a registered Dead Letter Queue API under that name.The API must be already be created by a operation before it can used. | String | optional, defaults to `throw` | +### API usage -#### API usage In kafka_assets v3, many core components were made into teraslice apis. When you use an kafka processor it will automatically setup the api for you, but if you manually specify the api, then there are restrictions on what configurations you can put on the operation so that clashing of configurations are minimized. The api configs take precedence. If submitting the job in long form, here is a list of parameters that will throw an error if also specified on the opConfig, since these values should be placed on the api: -- `topic` +- `topic` `SHORT FORM (no api specified)` + ```json { "name": "test-job", From d48316f66f8dea4e5c222785bc64334a5eb55557 Mon Sep 17 00:00:00 2001 From: busma13 Date: Wed, 27 Nov 2024 13:39:44 -0700 Subject: [PATCH 4/5] set minimum_teraslice_version in asset.json to 2.9.0 --- asset/asset.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/asset/asset.json b/asset/asset.json index 4dfdcdc4..dfbec797 100644 --- a/asset/asset.json +++ b/asset/asset.json @@ -1,5 +1,6 @@ { "name": "kafka", "description": "Kafka reader and writer support.", - "version": "5.3.1" + "version": "5.3.1", + "minimum_teraslice_version": "2.9.0" } From 5fa79d555e48f161a43f4c781fe3897105bd9c8c Mon Sep 17 00:00:00 2001 From: busma13 Date: Wed, 27 Nov 2024 13:42:56 -0700 Subject: [PATCH 5/5] bump: (minor) kafka-assets@5.4.0, kafka-asset-bundle@5.4.0 --- asset/asset.json | 2 +- asset/package.json | 2 +- package.json | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/asset/asset.json b/asset/asset.json index dfbec797..1f855415 100644 --- a/asset/asset.json +++ b/asset/asset.json @@ -1,6 +1,6 @@ { "name": "kafka", "description": "Kafka reader and writer support.", - "version": "5.3.1", + "version": "5.4.0", "minimum_teraslice_version": "2.9.0" } diff --git a/asset/package.json b/asset/package.json index 947cc903..3056ba6a 100644 --- a/asset/package.json +++ b/asset/package.json @@ -1,7 +1,7 @@ { "name": "kafka-assets", "displayName": "Asset", - "version": "5.3.1", + "version": "5.4.0", "private": true, "description": "Teraslice asset for kafka operations", "license": "MIT", diff --git a/package.json b/package.json index e64c6a08..8b8f71a0 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "kafka-asset-bundle", "displayName": "Kafka Asset Bundle", - "version": "5.3.1", + "version": "5.4.0", "private": true, "description": "A bundle of Kafka operations and processors for Teraslice", "repository": "git@github.com:terascope/kafka-assets.git",