Skip to content

Commit

Permalink
remove partition_assignment_strategy option from kafka_sender and kaf…
Browse files Browse the repository at this point in the history
…ka_sender_api
  • Loading branch information
busma13 committed Dec 6, 2024
1 parent f387d4c commit 106a42b
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 26 deletions.
5 changes: 0 additions & 5 deletions asset/src/kafka_sender/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ export interface KafkaSenderConfig extends OpConfig {
Set to -1 to disable polling.
*/
metadata_refresh: number;
/**
* Name of partition assignment strategy to use when elected
* group leader assigns partitions to group members.
*/
partition_assignment_strategy?: 'range' | 'roundrobin';
/**
* This field indicates the number of acknowledgements the leader broker
* must receive from ISR brokers before responding to the request:
Expand Down
12 changes: 2 additions & 10 deletions asset/src/kafka_sender_api/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ export default class KafkaSenderApi extends APIFactory<KafkaRouteSender, KafkaSe
if (isNotNil(config.id_field) && !isString(config.id_field)) throw new Error(`Parameter id_field must be provided and be of type string, got ${getTypeOf(config.id_field)}`);
if (isNotNil(config.timestamp_field) && !isString(config.timestamp_field)) throw new Error(`Parameter timestamp_field must be provided and be of type string, got ${getTypeOf(config.timestamp_field)}`);
if (isNotNil(config.timestamp_now) && !isBoolean(config.timestamp_now)) throw new Error(`Parameter timestamp_now must be provided and be of type string, got ${getTypeOf(config.timestamp_now)}`);
if (isNil(config.partition_assignment_strategy) || !isString(config.partition_assignment_strategy)) throw new Error(`Parameter partition_assignment_strategy must be provided and be of type string, got ${getTypeOf(config.partition_assignment_strategy)}`);
if (isNil(config.compression) || !isString(config.compression)) throw new Error(`Parameter compression must be provided and be of type string, got ${getTypeOf(config.compression)}`);
if (isNil(config.wait) || !isNumber(config.wait)) throw new Error(`Parameter wait must be provided and be of type number, got ${getTypeOf(config.wait)}`);
if (isNil(config.metadata_refresh) || !isNumber(config.metadata_refresh)) throw new Error(`Parameter metadata_refresh must be provided and be of type number, got ${getTypeOf(config.metadata_refresh)}`);
Expand All @@ -40,7 +39,7 @@ export default class KafkaSenderApi extends APIFactory<KafkaRouteSender, KafkaSe

private clientConfig(clientConfig: KafkaSenderAPIConfig = {}) {
const kafkaConfig = Object.assign({}, this.apiConfig, clientConfig);
const config = {
return {
type: 'kafka',
endpoint: kafkaConfig.connection,
options: {
Expand All @@ -59,14 +58,7 @@ export default class KafkaSenderApi extends APIFactory<KafkaRouteSender, KafkaSe
'request.required.acks': kafkaConfig.required_acks
} as Record<string, any>,
autoconnect: false
};

const assignmentStrategy = kafkaConfig.partition_assignment_strategy;
if (assignmentStrategy) {
config.rdkafka_options['partition.assignment.strategy'] = assignmentStrategy;
}

return config as ConnectionConfig;
} as ConnectionConfig;
}

async create(
Expand Down
5 changes: 0 additions & 5 deletions asset/src/kafka_sender_api/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ export const schema = {
default: '5 minutes',
format: 'duration'
},
partition_assignment_strategy: {
doc: 'Name of partition assignment strategy to use when elected group leader assigns partitions to group members.',
default: '',
format: ['range', 'roundrobin', '']
},
required_acks: {
doc: 'The number of required broker acknowledgements for a given request, set to -1 for all.',
default: 1,
Expand Down
1 change: 0 additions & 1 deletion docs/apis/kafka_sender_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,5 +205,4 @@ await api.send([
| connection | Name of the kafka connection to use when sending data | String | optional, defaults to the 'default' connection in the kafka terafoundation connector config |
| required_acks | The number of required broker acknowledgements for a given request, set to -1 for all. | Number | optional, defaults to `1` |
| metadata_refresh | How often the producer will poll the broker for metadata information. Set to -1 to disable polling. | String/Duration/Number | optional, defaults to `"5 minutes"` |
| partition_assignment_strategy | Name of partition assignment strategy to use when elected group leader assigns partitions to group members. May be set to `range`, `roundrobin` or `""` | String | optional, defaults to `""` |
| api_name | Name of `kafka_sender_api` used for the sender, if none is provided, then one is made and assigned the name to `kafka_sender_api`, and is injected into the execution | String | optional, defaults to `kafka_sender_api`|| _encoding | Used for specifying the data encoding type when using DataEntity.fromBuffer. May be set to `json` or `raw` | String | optional, defaults to `json` |
16 changes: 11 additions & 5 deletions docs/operations/kafka_sender.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# kafka_sender

The kafka_sender is used to send data to a kafka topic. This is a high throughput operation.

This uses [node-rdkafka](https://github.com/Blizzard/node-rdkafka) underneath the hood.
Expand All @@ -8,9 +9,11 @@ For this sender to function properly, you will need a running kafka cluster and
## Usage

### Send data to topic, use key and time from fields on record

In this example, the kafka_sender will send data to the kafka-test-sender topic using the uuid field of the record. It will also annotate the kafka record timestamp metadata with the date specified on the created field on the record.

Example job

```json
{
"name": "test-job",
Expand Down Expand Up @@ -64,9 +67,11 @@ results === data;
```

### Send data to topic, use _key metadata and create its own timestamp
In this example, the kafka_sender will send data to the kafka-test-sender topic using the _key metadata value, which happens when the `id_field` is not set. It will also annotate the kafka record timestamp metadata with a new date at processing time.

In this example, the kafka_sender will send data to the kafka-test-sender topic using the_key metadata value, which happens when the `id_field` is not set. It will also annotate the kafka record timestamp metadata with a new date at processing time.

Example job

```json
{
"name": "test-job",
Expand Down Expand Up @@ -133,19 +138,20 @@ results === data;
| connection | Name of the kafka connection to use when sending data | String | optional, defaults to the 'default' connection in the kafka terafoundation connector config |
| required_acks | The number of required broker acknowledgements for a given request, set to -1 for all. | Number | optional, defaults to `1` |
| metadata_refresh | How often the producer will poll the broker for metadata information. Set to -1 to disable polling. | String/Duration/Number | optional, defaults to `"5 minutes"` |
| partition_assignment_strategy | Name of partition assignment strategy to use when elected group leader assigns partitions to group members. May be set to `range`, `roundrobin` or `""` | String | optional, defaults to `""` |
| api_name | Name of `kafka_sender_api` used for the sender, if none is provided, then one is made and assigned the name to `kafka_sender_api`, and is injected into the execution | String | optional, defaults to `kafka_sender_api`|| _encoding | Used for specifying the data encoding type when using DataEntity.fromBuffer. May be set to `json` or `raw` | String | optional, defaults to `json` |
| api_name | Name of `kafka_sender_api` used for the sender, if none is provided, then one is made and assigned the name to `kafka_sender_api`, and is injected into the execution | String | optional, defaults to `kafka_sender_api`|
| _encoding | Used for specifying the data encoding type when using DataEntity.fromBuffer. May be set to `json` or `raw` | String | optional, defaults to `json` |
| _dead_letter_action | action will specify what to do when failing to parse or transform a record. It may be set to `throw`, `log` or `none`. If none of the actions are specified it will try and use a registered Dead Letter Queue API under that name.The API must be already be created by a operation before it can used. | String | optional, defaults to `throw` |

### API usage

#### API usage
In kafka_assets v3, many core components were made into teraslice apis. When you use an kafka processor it will automatically setup the api for you, but if you manually specify the api, then there are restrictions on what configurations you can put on the operation so that clashing of configurations are minimized. The api configs take precedence.

If submitting the job in long form, here is a list of parameters that will throw an error if also specified on the opConfig, since these values should be placed on the api:
- `topic`

- `topic`

`SHORT FORM (no api specified)`

```json
{
"name": "test-job",
Expand Down

0 comments on commit 106a42b

Please sign in to comment.