Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic prefix flag #64

Merged
merged 3 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,20 @@ jobs:
- name: Produce to Kafka from Avro Schema in Avro Format
run: docker exec datagen datagen -s /tests/schema.avsc -f avro -n 3 --record-size 100 -d -w 100

- name: Topic prefix with json
run: docker exec datagen datagen -s /tests/schema.sql -f json -n 3 --record-size 100 -d --prefix test

- name: Topic prefix with avro
run: docker exec datagen datagen -s /tests/schema.sql -f avro -n 3 --record-size 100 -d --prefix test

- name: Clean Kafka topic
run: docker exec datagen datagen -s /tests/schema.sql -f json -d --clean

- name: Clean Kafka topic with prefix
run: docker exec datagen datagen -s /tests/schema.sql -f json -d --clean --prefix test

- name: Clean Kafka topic and schema registry
run: docker exec datagen datagen -s /tests/schema.avsc -f avro -d --clean

- name: Docker Compose Down
run: docker compose down -v
4 changes: 3 additions & 1 deletion datagen.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ program
.option('-dr, --dry-run', 'Dry run (no data will be produced to Kafka)')
.option('-d, --debug', 'Output extra debugging information')
.option('-w, --wait <int>', 'Wait time in ms between record production', parseInt)
.option('-rs, --record-size <int>', 'Record size in bytes, eg. 1048576 for 1MB', parseInt);
.option('-rs, --record-size <int>', 'Record size in bytes, eg. 1048576 for 1MB', parseInt)
.option('-p, --prefix <char>', 'Kafka topic and schema registry prefix');

program.parse();

Expand All @@ -56,6 +57,7 @@ global.recordSize = options.recordSize;
global.wait = options.wait;
global.clean = options.clean;
global.dryRun = options.dryRun;
global.prefix = options.prefix;

if (debug) {
console.log(options);
Expand Down
12 changes: 9 additions & 3 deletions src/dataGenerator.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const alert = require('cli-alerts');
const crypto = require('crypto');
const createTopic = require('./kafka/createTopic');
const producer = require('./kafka/producer');
const schemaRegistryConfig = require('./kafka/schemaRegistryConfig');
const {kafkaProducer, connectKafkaProducer, disconnectKafkaProducer} = require('./kafka/producer');
const {
getAvroEncodedRecord,
registerSchema,
Expand Down Expand Up @@ -93,6 +93,7 @@ module.exports = async ({
schema,
number
}) => {

let payload;
if (recordSize) {
recordSize = recordSize / 2;
Expand All @@ -101,7 +102,9 @@ module.exports = async ({

let registry;
let avroSchemas = {};

if(dryRun !== true){
const producer = await connectKafkaProducer();
}
for await (const iteration of asyncGenerator(number)) {
global.iterationIndex = iteration;
megaRecord = await generateMegaRecord(schema);
Expand Down Expand Up @@ -151,11 +154,14 @@ module.exports = async ({
avroSchemas[topic]['schemaId']
);
}
await producer(recordKey, record, encodedRecord, topic);
await kafkaProducer(producer, recordKey, record, encodedRecord, topic);
}
}
}

await sleep(wait);
}
if(dryRun !== true){
await disconnectKafkaProducer(producer);
}
};
12 changes: 12 additions & 0 deletions src/kafka/cleanKafka.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const kafkaConfig = require('./kafkaConfig');
const axios = require('axios');
const dotenv = require('dotenv');
const alert = require('cli-alerts');


async function deleteSchemaSubjects(topics) {
Expand All @@ -10,6 +11,7 @@ async function deleteSchemaSubjects(topics) {
process.exit();
}
for await (const topic of topics){

let url = `${process.env.SCHEMA_REGISTRY_URL}/subjects/${topic}-value?permanent=false`;
await axios.delete(
url,
Expand All @@ -36,6 +38,16 @@ module.exports = async (format, topics) => {
console.log("This is a dry run, so no resources will be deleted")
return
}
if (prefix) {
// Loop through topics and add prefix
topics = topics.map(topic => `${prefix}_${topic}`);
alert({
type: `success`,
name: `Using topic with prefix: ${prefix}`,
msg: ``
});
}

const kafka = kafkaConfig();
const admin = kafka.admin();
await admin.connect();
Expand Down
11 changes: 11 additions & 0 deletions src/kafka/createTopic.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const { ConfigResourceTypes } = require('kafkajs');
const kafkaConfig = require('./kafkaConfig');
const alert = require('cli-alerts');
const dotenv = require('dotenv');

module.exports = async (topic = 'datagen_test_topic') => {
Expand All @@ -8,6 +9,16 @@ module.exports = async (topic = 'datagen_test_topic') => {
if (debug) {
console.log(`Trying to create topic: ${topic}`);
}

if (prefix) {
topic = `${prefix}_${topic}`;
alert({
type: `success`,
name: `Using topic with prefix: ${topic}`,
msg: ``
});
}

// Check if the topic exists in the Kafka cluster if not create it
const admin = kafka.admin();
await admin.connect();
Expand Down
40 changes: 32 additions & 8 deletions src/kafka/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ const kafkaConfig = require('./kafkaConfig');
const alert = require('cli-alerts');
const dotenv = require('dotenv');

module.exports = async (recordKey = null, record, encodedRecord = null, topic = 'datagen_test_topic') => {
// Produce the record to Kafka
const kafka = kafkaConfig();
async function kafkaProducer(producer, recordKey = null, record, encodedRecord = null, topic = 'datagen_test_topic') {

const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner
});
await producer.connect();
if (prefix) {
topic = `${prefix}_${topic}`;
alert({
type: `success`,
name: `Using topic with prefix: ${topic}`,
msg: ``
});
}

let payload;
if (encodedRecord) {
Expand All @@ -36,6 +38,28 @@ module.exports = async (recordKey = null, record, encodedRecord = null, topic =
name: `Record sent to Kafka topic: ${topic}`,
msg: `\nkey: ${recordKey}\nvalue:\n${JSON.stringify(record)}`
});
};

async function connectKafkaProducer() {
const kafka = kafkaConfig();
const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner
});

if (debug) {
console.log(`Connecting to Kafka producer...`);
}
await producer.connect();
return producer;
}

async function disconnectKafkaProducer(producer) {
if (debug) {
console.log(`Disconnecting from Kafka producer...`);
}
await producer.disconnect();
};
}

module.exports.kafkaProducer = kafkaProducer;
module.exports.connectKafkaProducer = connectKafkaProducer;
module.exports.disconnectKafkaProducer = disconnectKafkaProducer;