diff --git a/packages/destination-actions/package.json b/packages/destination-actions/package.json index a470d85a6f..06feb99dbf 100644 --- a/packages/destination-actions/package.json +++ b/packages/destination-actions/package.json @@ -52,6 +52,7 @@ "dayjs": "^1.10.7", "escape-goat": "^3", "google-libphonenumber": "^3.2.31", + "kafkajs": "^2.2.4", "liquidjs": "^9.37.0", "lodash": "^4.17.21", "ssh2-sftp-client": "^9.1.0" diff --git a/packages/destination-actions/src/destinations/kafka/generated-types.ts b/packages/destination-actions/src/destinations/kafka/generated-types.ts new file mode 100644 index 0000000000..7b4562681d --- /dev/null +++ b/packages/destination-actions/src/destinations/kafka/generated-types.ts @@ -0,0 +1,28 @@ +// Generated file. DO NOT MODIFY IT BY HAND. + +export interface Settings { + /** + * The brokers for your Kafka instance, in the format of `host:port`. Accepts a comma delimited string. + */ + brokers: string + /** + * The SASL Authentication Mechanism for your Kafka instance. + */ + mechanism: string + /** + * The client ID for your Kafka instance. Defaults to "segment-actions-kafka-producer". + */ + clientId: string + /** + * The username for your Kafka instance. + */ + username: string + /** + * The password for your Kafka instance. + */ + password: string + /** + * The partitioner type for your Kafka instance. Defaults to "Default Partitioner". + */ + partitionerType: string +} diff --git a/packages/destination-actions/src/destinations/kafka/index.ts b/packages/destination-actions/src/destinations/kafka/index.ts new file mode 100644 index 0000000000..a0a1d8b6e2 --- /dev/null +++ b/packages/destination-actions/src/destinations/kafka/index.ts @@ -0,0 +1,70 @@ +import type { DestinationDefinition } from '@segment/actions-core' +import type { Settings } from './generated-types' + +import send from './send' + +const destination: DestinationDefinition = { + name: 'Kafka', + slug: 'actions-kafka', + mode: 'cloud', + description: 'Send data to a Kafka topic', + authentication: { + scheme: 'custom', + fields: { + brokers: { + label: 'Brokers', + description: + 'The brokers for your Kafka instance, in the format of `host:port`. Accepts a comma delimited string.', + type: 'string', + required: true + }, + mechanism: { + label: 'SASL Authentication Mechanism', + description: 'The SASL Authentication Mechanism for your Kafka instance.', + type: 'string', + required: true, + choices: [ + { label: 'Plain', value: 'plain' }, + { label: 'SCRAM/SHA-256', value: 'scram-sha-256' }, + { label: 'SCRAM/SHA-512', value: 'scram-sha-512' } + ], + default: 'plain' + }, + clientId: { + label: 'Client ID', + description: 'The client ID for your Kafka instance. Defaults to "segment-actions-kafka-producer".', + type: 'string', + required: true, + default: 'segment-actions-kafka-producer' + }, + username: { + label: 'Username', + description: 'The username for your Kafka instance.', + type: 'string', + required: true + }, + password: { + label: 'Password', + description: 'The password for your Kafka instance.', + type: 'password', + required: true + }, + partitionerType: { + label: 'Partitioner Type', + description: 'The partitioner type for your Kafka instance. Defaults to "Default Partitioner".', + type: 'string', + required: true, + choices: [ + { label: 'Default Partitioner', value: 'DefaultPartitioner' }, + { label: 'Legacy Partitioner', value: 'LegacyPartitioner' } + ], + default: 'DefaultPartitioner' + } + } + }, + actions: { + send + } +} + +export default destination diff --git a/packages/destination-actions/src/destinations/kafka/send/__tests__/index.test.ts b/packages/destination-actions/src/destinations/kafka/send/__tests__/index.test.ts new file mode 100644 index 0000000000..e7ee83f1b7 --- /dev/null +++ b/packages/destination-actions/src/destinations/kafka/send/__tests__/index.test.ts @@ -0,0 +1,102 @@ +import { createTestIntegration } from '@segment/actions-core' +import Destination from '../../index' +import { Kafka, KafkaConfig, Partitioners } from 'kafkajs' + +const testDestination = createTestIntegration(Destination) + +jest.mock('kafkajs', () => { + const mockProducer = { + connect: jest.fn(), + send: jest.fn(), + disconnect: jest.fn() + } + + const mockKafka = { + producer: jest.fn(() => mockProducer) + } + + return { + Kafka: jest.fn(() => mockKafka), + Producer: jest.fn(() => mockProducer), + Partitioners: { + LegacyPartitioner: jest.fn(), + DefaultPartitioner: jest.fn() + } + } +}) + +const testData = { + event: { + type: 'track', + event: 'Test Event', + properties: { + email: 'test@iterable.com' + }, + traits: {}, + timestamp: '2024-02-26T16:53:08.910Z', + sentAt: '2024-02-26T16:53:08.910Z', + receivedAt: '2024-02-26T16:53:08.907Z', + messageId: 'a82f52d9-d8ed-40a8-89e3-b9c04701a5f6', + userId: 'user1234', + anonymousId: 'anonId1234', + context: {} + }, + useDefaultMappings: false, + settings: { + brokers: 'yourBroker', + clientId: 'yourClientId', + mechanism: 'plain', + username: 'yourUsername', + password: 'yourPassword', + partitionerType: 'DefaultPartitioner' + }, + mapping: { + topic: 'test-topic', + payload: { '@path': '$.' } + } +} + +describe('Kafka.send', () => { + it('kafka library is initialized correctly', async () => { + await testDestination.testAction('send', testData as any) + + expect(Kafka).toHaveBeenCalledWith( + expect.objectContaining({ + clientId: 'yourClientId', + brokers: ['yourBroker'], + ssl: true, + sasl: { + mechanism: 'plain', + username: 'yourUsername', + password: 'yourPassword' + } + }) + ) + }) + + it('kafka producer is initialized correctly', async () => { + await testDestination.testAction('send', testData as any) + + expect(new Kafka({} as KafkaConfig).producer).toBeCalledWith({ + createPartitioner: Partitioners.DefaultPartitioner + }) + }) + + it('kafka.producer() send() is called with the correct payload', async () => { + await testDestination.testAction('send', testData as any) + + expect(new Kafka({} as KafkaConfig).producer().send).toBeCalledWith({ + topic: 'test-topic', + messages: [ + { + value: + '{"anonymousId":"anonId1234","context":{},"event":"Test Event","messageId":"a82f52d9-d8ed-40a8-89e3-b9c04701a5f6","properties":{"email":"test@iterable.com"},"receivedAt":"2024-02-26T16:53:08.907Z","sentAt":"2024-02-26T16:53:08.910Z","timestamp":"2024-02-26T16:53:08.910Z","traits":{},"type":"track","userId":"user1234"}', + key: undefined, + headers: undefined, + partition: undefined, + partitionerType: 'DefaultPartitioner' + } + ] + }) + }) +}) diff --git a/packages/destination-actions/src/destinations/kafka/send/generated-types.ts b/packages/destination-actions/src/destinations/kafka/send/generated-types.ts new file mode 100644 index 0000000000..1883e782bd --- /dev/null +++ b/packages/destination-actions/src/destinations/kafka/send/generated-types.ts @@ -0,0 +1,32 @@ +// Generated file. DO NOT MODIFY IT BY HAND. + +export interface Payload { + /** + * The Kafka topic to send messages to. This field auto-populates from your Kafka instance. + */ + topic: string + /** + * The data to send to Kafka + */ + payload: { + [k: string]: unknown + } + /** + * Header data to send to Kafka. Format is Header key, Header value (optional). + */ + headers?: { + [k: string]: unknown + } + /** + * The partition to send the message to (optional) + */ + partition?: number + /** + * The default partition to send the message to (optional) + */ + default_partition?: number + /** + * The key for the message (optional) + */ + key?: string +} diff --git a/packages/destination-actions/src/destinations/kafka/send/index.ts b/packages/destination-actions/src/destinations/kafka/send/index.ts new file mode 100644 index 0000000000..9df3ce20e7 --- /dev/null +++ b/packages/destination-actions/src/destinations/kafka/send/index.ts @@ -0,0 +1,60 @@ +import type { ActionDefinition } from '@segment/actions-core' +import type { Settings } from '../generated-types' +import type { Payload } from './generated-types' +import { getTopics, sendData } from '../utils' + +const action: ActionDefinition = { + title: 'Send', + description: 'Send data to a Kafka topic', + defaultSubscription: 'type = "track" or type = "identify" or type = "page" or type = "screen" or type = "group"', + fields: { + topic: { + label: 'Topic', + description: 'The Kafka topic to send messages to. This field auto-populates from your Kafka instance.', + type: 'string', + required: true, + dynamic: true + }, + payload: { + label: 'Payload', + description: 'The data to send to Kafka', + type: 'object', + required: true, + default: { '@path': '$.' } + }, + headers: { + label: 'Headers', + description: 'Header data to send to Kafka. Format is Header key, Header value (optional).', + type: 'object', + defaultObjectUI: 'keyvalue:only' + }, + partition: { + label: 'Partition', + description: 'The partition to send the message to (optional)', + type: 'integer' + }, + default_partition: { + label: 'Default Partition', + description: 'The default partition to send the message to (optional)', + type: 'integer' + }, + key: { + label: 'Message Key', + description: 'The key for the message (optional)', + type: 'string' + } + }, + dynamicFields: { + topic: async (_, { settings }) => { + return getTopics(settings) + } + }, + perform: async (_request, { settings, payload }) => { + await sendData(settings, [payload]) + }, + performBatch: async (_request, { settings, payload }) => { + await sendData(settings, payload) + } +} + +export default action diff --git a/packages/destination-actions/src/destinations/kafka/utils.ts b/packages/destination-actions/src/destinations/kafka/utils.ts new file mode 100644 index 0000000000..4d2000d222 --- /dev/null +++ b/packages/destination-actions/src/destinations/kafka/utils.ts @@ -0,0 +1,84 @@ +import { Kafka, SASLOptions, ProducerRecord, Partitioners } from 'kafkajs' +import type { DynamicFieldResponse } from '@segment/actions-core' +import type { Settings } from './generated-types' +import type { Payload } from './send/generated-types' + +export const DEFAULT_PARTITIONER = 'DefaultPartitioner' +export const LEGACY_PARTITIONER = 'LegacyPartitioner' + +interface Message { + value: string + key?: string + headers?: { [key: string]: string } + partition?: number + partitionerType?: typeof LEGACY_PARTITIONER | typeof DEFAULT_PARTITIONER +} +interface TopicMessages { + topic: string + messages: Message[] +} + +export const getTopics = async (settings: Settings): Promise => { + const kafka = getKafka(settings) + const admin = kafka.admin() + await admin.connect() + const topics = await admin.listTopics() + await admin.disconnect() + return { choices: topics.map((topic) => ({ label: topic, value: topic })) } +} + +const getKafka = (settings: Settings) => { + return new Kafka({ + clientId: settings.clientId, + brokers: settings.brokers.trim().split(',').map(broker => broker.trim()), + ssl: true, + sasl: { + mechanism: settings.mechanism, + username: settings.username, + password: settings.password + } as SASLOptions + }) +} + +const getProducer = (settings: Settings) => { + return getKafka(settings).producer({ + createPartitioner: + settings.partitionerType === LEGACY_PARTITIONER + ? Partitioners.LegacyPartitioner + : Partitioners.DefaultPartitioner + }) +} + +export const sendData = async (settings: Settings, payload: Payload[]) => { + const groupedPayloads: { [topic: string]: Payload[] } = {} + + payload.forEach((p) => { + const { topic } = p + if (!groupedPayloads[topic]) { + groupedPayloads[topic] = [] + } + groupedPayloads[topic].push(p) + }) + + const topicMessages: TopicMessages[] = Object.keys(groupedPayloads).map((topic) => ({ + topic, + messages: groupedPayloads[topic].map((payload) => ({ + value: JSON.stringify(payload.payload), + key: payload.key, + headers: payload?.headers ?? undefined, + partition: payload?.partition ?? payload?.default_partition ?? undefined, + partitionerType: settings.partitionerType + }) as Message) + })) + + const producer = getProducer(settings) + + await producer.connect() + + for (const data of topicMessages) { + await producer.send(data as ProducerRecord) + } + + await producer.disconnect() + +} diff --git a/yarn.lock b/yarn.lock index b9cc34ba4b..91a80a028c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4862,7 +4862,7 @@ ansi-html-community@^0.0.8: ansi-regex@5.0.1, ansi-regex@^2.0.0, ansi-regex@^2.1.1, ansi-regex@^3.0.0, ansi-regex@^5.0.0, ansi-regex@^5.0.1, ansi-regex@^6.0.1: version "5.0.1" - resolved "https://registry.npmjs.org/ansi-regex/-/ansi-regex-5.0.1.tgz#082cb2c89c9fe8659a311a53bd6a4dc5301db304" + resolved "https://registry.yarnpkg.com/ansi-regex/-/ansi-regex-5.0.1.tgz#082cb2c89c9fe8659a311a53bd6a4dc5301db304" integrity sha512-quJQXlTSUGL2LH9SUXo8VwsY4soanhgo6LNSm84E1LBcE8s3O0wpdiRzyR9z/ZZJMlMWv37qOOb9pdJlMUEKFQ== ansi-styles@^2.2.1: @@ -7369,7 +7369,7 @@ dot-prop@6.0.1: dot-prop@^4.2.1: version "4.2.1" - resolved "https://registry.npmjs.org/dot-prop/-/dot-prop-4.2.1.tgz#45884194a71fc2cda71cbb4bceb3a4dd2f433ba4" + resolved "https://registry.yarnpkg.com/dot-prop/-/dot-prop-4.2.1.tgz#45884194a71fc2cda71cbb4bceb3a4dd2f433ba4" integrity sha512-l0p4+mIuJIua0mhxGoh4a+iNL9bmeK5DvnSVQa6T0OhrVmaEa1XScX5Etc673FePCJOArq/4Pa2cLGODUWTPOQ== dependencies: is-obj "^1.0.0" @@ -8948,7 +8948,7 @@ glob-parent@5.1.2, glob-parent@^5.1.2, glob-parent@~5.1.2: glob-parent@^6.0.1: version "6.0.2" - resolved "https://registry.npmjs.org/glob-parent/-/glob-parent-6.0.2.tgz#6d237d99083950c79290f24c7642a3de9a28f9e3" + resolved "https://registry.yarnpkg.com/glob-parent/-/glob-parent-6.0.2.tgz#6d237d99083950c79290f24c7642a3de9a28f9e3" integrity sha512-XxwI8EOhVQgWp6iDL+3b0r86f4d6AX6zSU55HfB4ydCEuXLXc5FcYeOu+nnGftS4TEju/11rt4KJPTMgbfmv4A== dependencies: is-glob "^4.0.3" @@ -10075,7 +10075,7 @@ is-number@^7.0.0: is-obj@^1.0.0, is-obj@^1.0.1: version "1.0.1" - resolved "https://registry.npmjs.org/is-obj/-/is-obj-1.0.1.tgz#3e4729ac1f5fde025cd7d83a896dab9f4f67db0f" + resolved "https://registry.yarnpkg.com/is-obj/-/is-obj-1.0.1.tgz#3e4729ac1f5fde025cd7d83a896dab9f4f67db0f" integrity sha512-l4RyHgRqGN4Y3+9JHVrNqO+tN0rV5My76uW5/nuO4K1b6vw5G8d/cmFjP9tRfEsdhZNt0IFdZuK/c2Vr4Nb+Qg== is-obj@^2.0.0: @@ -11219,6 +11219,11 @@ just-diff@^6.0.0: resolved "https://registry.yarnpkg.com/just-diff/-/just-diff-6.0.2.tgz#03b65908543ac0521caf6d8eb85035f7d27ea285" integrity sha512-S59eriX5u3/QhMNq3v/gm8Kd0w8OS6Tz2FS1NG4blv+z0MuQcBRJyFWjdovM0Rad4/P4aUPFtnkNjMjyMlMSYA== +kafkajs@^2.2.4: + version "2.2.4" + resolved "https://registry.yarnpkg.com/kafkajs/-/kafkajs-2.2.4.tgz#59e6e16459d87fdf8b64be73970ed5aa42370a5b" + integrity sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA== + karma-chrome-launcher@^3.1.1: version "3.1.1" resolved "https://registry.yarnpkg.com/karma-chrome-launcher/-/karma-chrome-launcher-3.1.1.tgz#baca9cc071b1562a1db241827257bfe5cab597ea"