Skip to content

Commit

Permalink
Kafka destination (segmentio#1892)
Browse files Browse the repository at this point in the history
* Initial commit for Kafka Action Destination

* Updating settings.

* Updating authentication test.

* Adding message key and payload fields for event mapping.

* Adding a mock for KafkaJS, based on https://github.com/Wei-Zou/jest-mock-kafkajs/blob/master/__mocks__/kafkajs.js.

* Stubbing unit tests and mocks.

* Stubbing some tests.

* Extra adjustments: removing unused auth mechanisms.

* minor changes

* minor change

* adding list topics

* reorder fields

* refactor and adding batch

* adding partitioning

* more functionality

* addind dynamic dropdown

* adding tests

* refactor from PR feedback

* fixing broker type

* updating tests

---------

Co-authored-by: joe-ayoub-segment <[email protected]>
  • Loading branch information
seg-leonelsanches and joe-ayoub-segment authored Feb 27, 2024
1 parent 854a9e1 commit 4a7edef
Show file tree
Hide file tree
Showing 8 changed files with 386 additions and 4 deletions.
1 change: 1 addition & 0 deletions packages/destination-actions/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"dayjs": "^1.10.7",
"escape-goat": "^3",
"google-libphonenumber": "^3.2.31",
"kafkajs": "^2.2.4",
"liquidjs": "^9.37.0",
"lodash": "^4.17.21",
"ssh2-sftp-client": "^9.1.0"
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 70 additions & 0 deletions packages/destination-actions/src/destinations/kafka/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import type { DestinationDefinition } from '@segment/actions-core'
import type { Settings } from './generated-types'

import send from './send'

const destination: DestinationDefinition<Settings> = {
name: 'Kafka',
slug: 'actions-kafka',
mode: 'cloud',
description: 'Send data to a Kafka topic',
authentication: {
scheme: 'custom',
fields: {
brokers: {
label: 'Brokers',
description:
'The brokers for your Kafka instance, in the format of `host:port`. Accepts a comma delimited string.',
type: 'string',
required: true
},
mechanism: {
label: 'SASL Authentication Mechanism',
description: 'The SASL Authentication Mechanism for your Kafka instance.',
type: 'string',
required: true,
choices: [
{ label: 'Plain', value: 'plain' },
{ label: 'SCRAM/SHA-256', value: 'scram-sha-256' },
{ label: 'SCRAM/SHA-512', value: 'scram-sha-512' }
],
default: 'plain'
},
clientId: {
label: 'Client ID',
description: 'The client ID for your Kafka instance. Defaults to "segment-actions-kafka-producer".',
type: 'string',
required: true,
default: 'segment-actions-kafka-producer'
},
username: {
label: 'Username',
description: 'The username for your Kafka instance.',
type: 'string',
required: true
},
password: {
label: 'Password',
description: 'The password for your Kafka instance.',
type: 'password',
required: true
},
partitionerType: {
label: 'Partitioner Type',
description: 'The partitioner type for your Kafka instance. Defaults to "Default Partitioner".',
type: 'string',
required: true,
choices: [
{ label: 'Default Partitioner', value: 'DefaultPartitioner' },
{ label: 'Legacy Partitioner', value: 'LegacyPartitioner' }
],
default: 'DefaultPartitioner'
}
}
},
actions: {
send
}
}

export default destination
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import { createTestIntegration } from '@segment/actions-core'
import Destination from '../../index'
import { Kafka, KafkaConfig, Partitioners } from 'kafkajs'

const testDestination = createTestIntegration(Destination)

jest.mock('kafkajs', () => {
const mockProducer = {
connect: jest.fn(),
send: jest.fn(),
disconnect: jest.fn()
}

const mockKafka = {
producer: jest.fn(() => mockProducer)
}

return {
Kafka: jest.fn(() => mockKafka),
Producer: jest.fn(() => mockProducer),
Partitioners: {
LegacyPartitioner: jest.fn(),
DefaultPartitioner: jest.fn()
}
}
})

const testData = {
event: {
type: 'track',
event: 'Test Event',
properties: {
email: '[email protected]'
},
traits: {},
timestamp: '2024-02-26T16:53:08.910Z',
sentAt: '2024-02-26T16:53:08.910Z',
receivedAt: '2024-02-26T16:53:08.907Z',
messageId: 'a82f52d9-d8ed-40a8-89e3-b9c04701a5f6',
userId: 'user1234',
anonymousId: 'anonId1234',
context: {}
},
useDefaultMappings: false,
settings: {
brokers: 'yourBroker',
clientId: 'yourClientId',
mechanism: 'plain',
username: 'yourUsername',
password: 'yourPassword',
partitionerType: 'DefaultPartitioner'
},
mapping: {
topic: 'test-topic',
payload: { '@path': '$.' }
}
}

describe('Kafka.send', () => {
it('kafka library is initialized correctly', async () => {
await testDestination.testAction('send', testData as any)

expect(Kafka).toHaveBeenCalledWith(
expect.objectContaining({
clientId: 'yourClientId',
brokers: ['yourBroker'],
ssl: true,
sasl: {
mechanism: 'plain',
username: 'yourUsername',
password: 'yourPassword'
}
})
)
})

it('kafka producer is initialized correctly', async () => {
await testDestination.testAction('send', testData as any)

expect(new Kafka({} as KafkaConfig).producer).toBeCalledWith({
createPartitioner: Partitioners.DefaultPartitioner
})
})

it('kafka.producer() send() is called with the correct payload', async () => {
await testDestination.testAction('send', testData as any)

expect(new Kafka({} as KafkaConfig).producer().send).toBeCalledWith({
topic: 'test-topic',
messages: [
{
value:
'{"anonymousId":"anonId1234","context":{},"event":"Test Event","messageId":"a82f52d9-d8ed-40a8-89e3-b9c04701a5f6","properties":{"email":"[email protected]"},"receivedAt":"2024-02-26T16:53:08.907Z","sentAt":"2024-02-26T16:53:08.910Z","timestamp":"2024-02-26T16:53:08.910Z","traits":{},"type":"track","userId":"user1234"}',
key: undefined,
headers: undefined,
partition: undefined,
partitionerType: 'DefaultPartitioner'
}
]
})
})
})

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 60 additions & 0 deletions packages/destination-actions/src/destinations/kafka/send/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import type { ActionDefinition } from '@segment/actions-core'
import type { Settings } from '../generated-types'
import type { Payload } from './generated-types'
import { getTopics, sendData } from '../utils'

const action: ActionDefinition<Settings, Payload> = {
title: 'Send',
description: 'Send data to a Kafka topic',
defaultSubscription: 'type = "track" or type = "identify" or type = "page" or type = "screen" or type = "group"',
fields: {
topic: {
label: 'Topic',
description: 'The Kafka topic to send messages to. This field auto-populates from your Kafka instance.',
type: 'string',
required: true,
dynamic: true
},
payload: {
label: 'Payload',
description: 'The data to send to Kafka',
type: 'object',
required: true,
default: { '@path': '$.' }
},
headers: {
label: 'Headers',
description: 'Header data to send to Kafka. Format is Header key, Header value (optional).',
type: 'object',
defaultObjectUI: 'keyvalue:only'
},
partition: {
label: 'Partition',
description: 'The partition to send the message to (optional)',
type: 'integer'
},
default_partition: {
label: 'Default Partition',
description: 'The default partition to send the message to (optional)',
type: 'integer'
},
key: {
label: 'Message Key',
description: 'The key for the message (optional)',
type: 'string'
}
},
dynamicFields: {
topic: async (_, { settings }) => {
return getTopics(settings)
}
},
perform: async (_request, { settings, payload }) => {
await sendData(settings, [payload])
},
performBatch: async (_request, { settings, payload }) => {
await sendData(settings, payload)
}
}

export default action
84 changes: 84 additions & 0 deletions packages/destination-actions/src/destinations/kafka/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { Kafka, SASLOptions, ProducerRecord, Partitioners } from 'kafkajs'
import type { DynamicFieldResponse } from '@segment/actions-core'
import type { Settings } from './generated-types'
import type { Payload } from './send/generated-types'

export const DEFAULT_PARTITIONER = 'DefaultPartitioner'
export const LEGACY_PARTITIONER = 'LegacyPartitioner'

interface Message {
value: string
key?: string
headers?: { [key: string]: string }
partition?: number
partitionerType?: typeof LEGACY_PARTITIONER | typeof DEFAULT_PARTITIONER
}
interface TopicMessages {
topic: string
messages: Message[]
}

export const getTopics = async (settings: Settings): Promise<DynamicFieldResponse> => {
const kafka = getKafka(settings)
const admin = kafka.admin()
await admin.connect()
const topics = await admin.listTopics()
await admin.disconnect()
return { choices: topics.map((topic) => ({ label: topic, value: topic })) }
}

const getKafka = (settings: Settings) => {
return new Kafka({
clientId: settings.clientId,
brokers: settings.brokers.trim().split(',').map(broker => broker.trim()),
ssl: true,
sasl: {
mechanism: settings.mechanism,
username: settings.username,
password: settings.password
} as SASLOptions
})
}

const getProducer = (settings: Settings) => {
return getKafka(settings).producer({
createPartitioner:
settings.partitionerType === LEGACY_PARTITIONER
? Partitioners.LegacyPartitioner
: Partitioners.DefaultPartitioner
})
}

export const sendData = async (settings: Settings, payload: Payload[]) => {
const groupedPayloads: { [topic: string]: Payload[] } = {}

payload.forEach((p) => {
const { topic } = p
if (!groupedPayloads[topic]) {
groupedPayloads[topic] = []
}
groupedPayloads[topic].push(p)
})

const topicMessages: TopicMessages[] = Object.keys(groupedPayloads).map((topic) => ({
topic,
messages: groupedPayloads[topic].map((payload) => ({
value: JSON.stringify(payload.payload),
key: payload.key,
headers: payload?.headers ?? undefined,
partition: payload?.partition ?? payload?.default_partition ?? undefined,
partitionerType: settings.partitionerType
}) as Message)
}))

const producer = getProducer(settings)

await producer.connect()

for (const data of topicMessages) {
await producer.send(data as ProducerRecord)
}

await producer.disconnect()

}
Loading

0 comments on commit 4a7edef

Please sign in to comment.