diff --git a/examples/blog.json b/examples/blog.json index ac0bc02..dfff1e1 100644 --- a/examples/blog.json +++ b/examples/blog.json @@ -3,14 +3,14 @@ "_meta": { "topic": "mz_datagen_blog_users", "key": "id", - "relationships": [ - { + "relationships": [ + { "topic": "mz_datagen_blog_posts", "parent_field": "id", "child_field": "user_id", "records_per": 2 } - ] + ] }, "id": "datatype.number(100)", "name": "internet.userName", @@ -24,14 +24,14 @@ "_meta": { "topic": "mz_datagen_blog_posts", "key": "id", - "relationships": [ - { + "relationships": [ + { "topic": "mz_datagen_blog_comments", "parent_field": "id", "child_field": "post_id", "records_per": 2 } - ] + ] }, "id": "datatype.number(1000)", "user_id": "datatype.number(100)", @@ -58,4 +58,4 @@ "views": "datatype.number({\"min\": 100, \"max\": 1000})", "status": "datatype.number(1)" } -] \ No newline at end of file +] diff --git a/examples/ecommerce.json b/examples/ecommerce.json index c3f6378..6f9390f 100644 --- a/examples/ecommerce.json +++ b/examples/ecommerce.json @@ -3,14 +3,14 @@ "_meta": { "topic": "mz_datagen_ecommerce_users", "key": "id", - "relationships": [ - { + "relationships": [ + { "topic": "mz_datagen_ecommerce_purchases", "parent_field": "id", "child_field": "user_id", "records_per": 4 } - ] + ] }, "id": "datatype.number(100)", "name": "internet.userName", @@ -24,14 +24,14 @@ "_meta": { "topic": "mz_datagen_ecommerce_purchases", "key": "id", - "relationships": [ + "relationships": [ { "topic": "mz_datagen_ecommerce_items", "parent_field": "item_id", "child_field": "id", "records_per": 1 } - ] + ] }, "id": "datatype.number(1000)", "user_id": "datatype.number(100)", @@ -49,4 +49,4 @@ "material": "commerce.productMaterial", "created_at": "date.past(5)" } -] \ No newline at end of file +] diff --git a/src/dataGenerator.ts b/src/dataGenerator.ts index 3832eec..f3326e3 100644 --- a/src/dataGenerator.ts +++ b/src/dataGenerator.ts @@ -1,15 +1,10 @@ import alert from 'cli-alerts'; import crypto from 'crypto'; -import createTopic from './kafka/createTopic.js'; -import schemaRegistryConfig from './kafka/schemaRegistryConfig.js'; -import { kafkaProducer, connectKafkaProducer, disconnectKafkaProducer } from './kafka/producer.js'; -import { - getAvroEncodedRecord, - registerSchema, - getAvroSchema -} from './schemas/schemaRegistry.js'; - +import { KafkaProducer } from './kafka/producer.js'; import { generateMegaRecord } from './schemas/generateMegaRecord.js'; +import { OutputFormat } from './formats/outputFormat.js'; +import { AvroFormat } from './formats/avroFormat.js'; +import { JsonFormat } from './formats/jsonFormat.js'; async function* asyncGenerator(number: number) { let i = 0; @@ -37,56 +32,6 @@ function sleep(s: number) { return new Promise(resolve => setTimeout(resolve, s)); } -async function prepareTopic(topic: string) { - if (global.dryRun) { - alert({ - type: `success`, - name: `Dry run: Skipping topic creation...`, - msg: `` - }); - return; - } - - alert({ - type: `success`, - name: `Creating Kafka topics...`, - msg: `` - }); - - try { - await createTopic(topic); - alert({ - type: `success`, - name: `Created topic ${topic}`, - msg: `` - }); - } catch (error) { - alert({ - type: `error`, - name: `Error creating Kafka topic, try creating it manually...`, - msg: `\n ${error.message}` - }); - process.exit(0); - } -} - -async function prepareSchema(megaRecord: any, topic: any, registry: any, avroSchemas: any) { - alert({ - type: `success`, - name: `Registering Avro schema...`, - msg: `` - }); - let avroSchema = await getAvroSchema( - topic, - megaRecord[topic].records[0] - ); - let schemaId = await registerSchema(avroSchema, registry); - avroSchemas[topic] = {}; - avroSchemas[topic]['schemaId'] = schemaId; - avroSchemas[topic]['schema'] = avroSchema; - return avroSchemas; -} - export default async function dataGenerator({ format, schema, @@ -103,47 +48,33 @@ export default async function dataGenerator({ payload = crypto.randomBytes(global.recordSize).toString('hex'); } - let registry; - let producer; - let avroSchemas = {}; - if(global.dryRun !== true){ - producer = await connectKafkaProducer(); + let producer: KafkaProducer | null = null; + if (global.dryRun !== true) { + let outputFormat: OutputFormat; + if (format === 'avro') { + outputFormat = await AvroFormat.create(); + } else if (format === 'json') { + outputFormat = new JsonFormat(); + } + + producer = await KafkaProducer.create(outputFormat); } + for await (const iteration of asyncGenerator(number)) { global.iterationIndex = iteration; let megaRecord = await generateMegaRecord(schema); if (iteration == 0) { - if (format == 'avro') { - if (global.dryRun) { - alert({ - type: `success`, - name: `Dry run: Skipping schema registration...`, - msg: `` - }); - } else { - registry = await schemaRegistryConfig(); - } - } for (const topic in megaRecord) { - await prepareTopic(topic); - if (format == 'avro' && global.dryRun !== true) { - avroSchemas = await prepareSchema( - megaRecord, - topic, - registry, - avroSchemas - ); - } + await producer?.prepare(topic, megaRecord); } } for (const topic in megaRecord) { for await (const record of megaRecord[topic].records) { - let encodedRecord = null; - let recordKey = null; + let key = null; if (record[megaRecord[topic].key]) { - recordKey = record[megaRecord[topic].key]; + key = record[megaRecord[topic].key]; } if (global.recordSize) { @@ -154,26 +85,16 @@ export default async function dataGenerator({ alert({ type: `success`, name: `Dry run: Skipping record production...`, - msg: `\n Topic: ${topic} \n Record key: ${recordKey} \n Payload: ${JSON.stringify( - record - )}` + msg: `\n Topic: ${topic} \n Record key: ${key} \n Payload: ${JSON.stringify(record)}` }); - } else { - if (format == 'avro') { - encodedRecord = await getAvroEncodedRecord( - record, - registry, - avroSchemas[topic]['schemaId'] - ); - } - await kafkaProducer(producer, recordKey, record, encodedRecord, topic); } + + await producer?.send(key, record, topic); } } await sleep(global.wait); } - if (global.dryRun !== true) { - await disconnectKafkaProducer(producer); - } + + await producer?.close(); }; diff --git a/src/formats/avroFormat.ts b/src/formats/avroFormat.ts new file mode 100644 index 0000000..023ac0d --- /dev/null +++ b/src/formats/avroFormat.ts @@ -0,0 +1,69 @@ +import { SchemaRegistry, SchemaType } from "@kafkajs/confluent-schema-registry"; +import { Env } from "../utils/env.js"; +import { OutputFormat } from "./outputFormat"; + +export class AvroFormat implements OutputFormat { + private schemas: any = {}; + private registry: SchemaRegistry; + + static async create(): Promise { + const url = Env.required("SCHEMA_REGISTRY_URL"); + const username = Env.optional("SCHEMA_REGISTRY_USERNAME", null); + const password = Env.optional("SCHEMA_REGISTRY_PASSWORD", null); + + const configuration = { + host: url + }; + + if (password && username) { + configuration["auth"] = { + username: username, + password: password + }; + } + + const registry = new SchemaRegistry(configuration); + return new AvroFormat(registry); + } + + constructor(registry: SchemaRegistry) { + this.registry = registry; + } + + async register(schema: any, topic: string): Promise { + const options = { subject: `${schema["name"]}-value` } + try { + const resp = await this.registry.register({ + type: SchemaType.AVRO, + schema: JSON.stringify(schema) + }, + options + ) + + alert({ + type: `success`, + name: `Schema registered!`, + msg: `Subject: ${options.subject}, ID: ${resp.id}` + }); + + this.schemas[topic] = { + 'schemaId': resp.id, + 'schema': schema + }; + } catch (error) { + alert({ + type: `error`, + name: `Failed to register schema.`, + msg: `${error}` + }); + + process.exit(1); + } + } + + async encode(record: any, topic: string): Promise { + const schemaId = this.schemas[topic]['schemaId'] + const encodedRecord = await this.registry.encode(schemaId, record); + return encodedRecord; + } +} \ No newline at end of file diff --git a/src/formats/jsonFormat.ts b/src/formats/jsonFormat.ts new file mode 100644 index 0000000..e8869e3 --- /dev/null +++ b/src/formats/jsonFormat.ts @@ -0,0 +1,13 @@ +import { OutputFormat } from "./outputFormat"; + +export class JsonFormat implements OutputFormat { + + register(schema: any, topic: string): Promise { + return Promise.resolve(); + } + + encode(record: any, _: string): Promise { + const value = JSON.stringify(record); + return Promise.resolve(Buffer.from(value)); + } +} \ No newline at end of file diff --git a/src/formats/outputFormat.ts b/src/formats/outputFormat.ts new file mode 100644 index 0000000..ff3d289 --- /dev/null +++ b/src/formats/outputFormat.ts @@ -0,0 +1,5 @@ +export interface OutputFormat { + register(schema: any, topic: string): Promise; + + encode(record: any, topic: string): Promise; +} \ No newline at end of file diff --git a/src/kafka/cleanKafka.ts b/src/kafka/cleanKafka.ts index ce62439..95e5d56 100644 --- a/src/kafka/cleanKafka.ts +++ b/src/kafka/cleanKafka.ts @@ -1,16 +1,13 @@ import kafkaConfig from './kafkaConfig.js'; import axios from 'axios'; -import dotenv from 'dotenv'; import alert from 'cli-alerts'; +import { Env } from '../utils/env.js'; async function deleteSchemaSubjects(topics: any): Promise { - dotenv.config(); - if (!process.env.SCHEMA_REGISTRY_URL) { - console.error("Please set SCHEMA_REGISTRY_URL"); - process.exit(); - } + const schemaRegistryUrl = Env.required("SCHEMA_REGISTRY_URL"); + for await (const topic of topics) { - let url = `${process.env.SCHEMA_REGISTRY_URL}/subjects/${topic}-value?permanent=false`; + let url = `${schemaRegistryUrl}/subjects/${topic}-value?permanent=false`; await axios.delete( url, { @@ -33,7 +30,6 @@ async function deleteSchemaSubjects(topics: any): Promise { } export default async function cleanKafka(format: string, topics: any): Promise { - if (global.dryRun) { console.log("This is a dry run, so no resources will be deleted") return @@ -66,5 +62,4 @@ export default async function cleanKafka(format: string, topics: any): Promise { const topics = await admin.listTopics(); if (!topics.includes(topic)) { - let replicationFactor = await getReplicationFactor(admin); let topicConfigs = [ diff --git a/src/kafka/kafkaConfig.ts b/src/kafka/kafkaConfig.ts index 85e38e0..f16276b 100644 --- a/src/kafka/kafkaConfig.ts +++ b/src/kafka/kafkaConfig.ts @@ -1,28 +1,12 @@ -import { Kafka, KafkaConfig, Mechanism, SASLOptions } from 'kafkajs'; -import dotenv from 'dotenv'; -import alert from 'cli-alerts'; - -interface MyKafkaConfig extends KafkaConfig { - sasl?: SASLOptions | Mechanism; -} +import { Kafka, KafkaConfig } from 'kafkajs'; +import { Env } from '../utils/env.js'; export default async function kafkaConfig() { - dotenv.config(); - // Abort if kafka details are not defined - if (!process.env.KAFKA_BROKERS) { - alert({ - type: `error`, - name: `Kafka details not defined`, - msg: `\n Please define the Kafka details in the .env file` - }); - process.exit(0); - } + const kafkaBrokers = Env.optional("KAFKA_BROKERS", "localhost:9092"); + const kafkaUser = Env.optional("SASL_USERNAME", null); + const kafkaPassword = Env.optional("SASL_PASSWORD", null); + const saslMechanism = Env.optional("SASL_MECHANISM", 'plain'); - // Kafka details - const kafkaBrokers = process.env.KAFKA_BROKERS || 'localhost:9092'; - const kafkaUser = process.env.SASL_USERNAME || null; - const kafkaPassword = process.env.SASL_PASSWORD || null; - const saslMechanism = process.env.SASL_MECHANISM || 'plain'; if (kafkaUser && kafkaPassword) { const conf: KafkaConfig = { brokers: [kafkaBrokers], @@ -35,7 +19,6 @@ export default async function kafkaConfig() { ssl: true, connectionTimeout: 10_000, authenticationTimeout: 10_000 - // logLevel: logLevel.DEBUG, }; const kafka = new Kafka(conf); return kafka; @@ -46,7 +29,6 @@ export default async function kafkaConfig() { ssl: false, connectionTimeout: 10_000, authenticationTimeout: 10_000 - // logLevel: logLevel.DEBUG, }); return kafka; }; diff --git a/src/kafka/producer.ts b/src/kafka/producer.ts index fee26ee..79d5b06 100644 --- a/src/kafka/producer.ts +++ b/src/kafka/producer.ts @@ -1,60 +1,75 @@ -import { Kafka, Partitioners, logLevel } from 'kafkajs'; -import kafkaConfig from './kafkaConfig.js'; -import alert from 'cli-alerts'; +import { Partitioners, Producer } from 'kafkajs'; +import kafkaConfig from './kafkaConfig.js'; +import alert from 'cli-alerts'; +import { OutputFormat } from '../formats/outputFormat.js'; +import createTopic from './createTopic.js'; -export async function kafkaProducer(producer: any, recordKey = null, record, encodedRecord = null, topic = 'datagen_test_topic') { +export class KafkaProducer { + private producer: Producer; + private format: OutputFormat; - if (global.prefix) { - topic = `${global.prefix}_${topic}`; + static async create(format: OutputFormat): Promise { + const kafka = await kafkaConfig(); + const producer = kafka.producer({ + createPartitioner: Partitioners.DefaultPartitioner + }); + + if (global.debug) { + console.log(`Connecting to Kafka brokers...`); + } + await producer.connect(); + return new KafkaProducer(producer, format); + } + + constructor(producer: Producer, format: OutputFormat) { + this.producer = producer; + this.format = format; + } + + async prepare(topic: string, schema: any): Promise { alert({ type: `success`, - name: `Using topic with prefix: ${topic}`, + name: `Creating Kafka topics...`, msg: `` }); - } - let payload; - if (encodedRecord) { - payload = encodedRecord; - } else { - payload = JSON.stringify(record); - } + try { + await createTopic(topic); + alert({ + type: `success`, + name: `Created topic ${topic}`, + msg: `` + }); - if (recordKey !== null) { - recordKey = recordKey.toString(); + await this.format.register(schema, topic); + } catch (error) { + alert({ + type: `error`, + name: `Error creating Kafka topic, try creating it manually...`, + msg: `\n ${error.message}` + }); + process.exit(0); + } } - await producer.send({ - topic: topic, - messages: [{ - key: recordKey, - value: payload - }] - }); - - alert({ - type: `success`, - name: `Record sent to Kafka topic: ${topic}`, - msg: `\nkey: ${recordKey}\nvalue:\n${JSON.stringify(record)}` - }); -}; - -export async function connectKafkaProducer() { - const kafka = await kafkaConfig(); - const producer = kafka.producer({ - createPartitioner: Partitioners.DefaultPartitioner - }); - - if (global.debug) { - console.log(`Connecting to Kafka producer...`); + async send(key: any, value: any, topic: string) { + let encoded = await this.format.encode(value, topic); + await this.producer.send({ + topic: topic, + messages: [{ + key: key?.toString(), + value: encoded + }] + }); + + alert({ + type: `success`, + name: `Record sent to Kafka topic: ${topic}`, + msg: `\nkey: ${key}\nvalue:\n${JSON.stringify(value)}` + }); } - await producer.connect(); - return producer; -} -export async function disconnectKafkaProducer(producer: any) { - if (global.debug) { - console.log(`Disconnecting from Kafka producer...`); + async close(): Promise { + await this.producer.disconnect(); } - await producer.disconnect(); } diff --git a/src/kafka/schemaRegistryConfig.ts b/src/kafka/schemaRegistryConfig.ts deleted file mode 100644 index eb117b7..0000000 --- a/src/kafka/schemaRegistryConfig.ts +++ /dev/null @@ -1,40 +0,0 @@ -import { SchemaRegistry } from '@kafkajs/confluent-schema-registry'; -import dotenv from 'dotenv'; -import alert from 'cli-alerts'; - -export default async function schemaRegistryConfig(): Promise { - dotenv.config(); - // Schema Registry details - // Abort if SR details are not defined - if ( - !process.env.SCHEMA_REGISTRY_URL - ) { - alert({ - type: `error`, - name: `Schema Registry details not defined`, - msg: `\n Please define the Schema Registry details in the .env file` - }); - process.exit(0); - } - - const url = process.env.SCHEMA_REGISTRY_URL; - const username = process.env.SCHEMA_REGISTRY_USERNAME || null; - const password = process.env.SCHEMA_REGISTRY_PASSWORD || null; - - if (password && username) { - const registry = new SchemaRegistry({ - host: url, - auth: { - username: username, - password: password - } - }); - - return registry; - } - const registry = new SchemaRegistry({ - host: url - }); - return registry; - -} diff --git a/src/schemas/parseJsonSchema.ts b/src/schemas/parseJsonSchema.ts index 56f98ec..0146531 100644 --- a/src/schemas/parseJsonSchema.ts +++ b/src/schemas/parseJsonSchema.ts @@ -7,12 +7,11 @@ export default function parseJsonSchema(schemaFile: any): Promise { msg: `` }); + let parsed = JSON.parse(schemaFile); if (global.debug) { - const parsed = JSON.parse(schemaFile); console.log(parsed); } - let parsed = JSON.parse(schemaFile); if (!Array.isArray(parsed)) { parsed = [parsed]; } diff --git a/src/schemas/schemaRegistry.ts b/src/schemas/schemaRegistry.ts index 87ebd86..e049cf5 100644 --- a/src/schemas/schemaRegistry.ts +++ b/src/schemas/schemaRegistry.ts @@ -1,4 +1,3 @@ -import { SchemaType } from '@kafkajs/confluent-schema-registry'; import avroTypes from '@avro/types'; const { Type } = avroTypes; import alert from 'cli-alerts'; @@ -18,13 +17,13 @@ function nameHook() { } // @ts-ignore -export async function getAvroSchema(topic, record){ +export async function getAvroSchema(topic, record) { // @ts-ignore - let avroSchema = Type.forValue(record,{typeHook: nameHook()}).schema(); + let avroSchema = Type.forValue(record, { typeHook: nameHook() }).schema(); avroSchema["name"] = topic avroSchema["namespace"] = "com.materialize" - if(global.prefix){ + if (global.prefix) { avroSchema["name"] = `${global.prefix}_${topic}`; } @@ -38,37 +37,3 @@ export async function getAvroSchema(topic, record){ return avroSchema; } - -export async function registerSchema(avroSchema: any, registry: any) { - let options = {subject: avroSchema["name"] + "-value"} - let schemaId; - try { - const resp = await registry.register({ - type: SchemaType.AVRO, - schema: JSON.stringify(avroSchema) - }, - options - ) - - schemaId = resp.id - - alert({ - type: `success`, - name: `Schema registered!`, - msg: `Subject: ${options.subject}, ID: ${schemaId}` - }); - } catch (error) { - alert({ - type: `error`, - name: `There was a problem registering schema.`, - msg: `${error}` - }); - } - - return schemaId; -} - -export async function getAvroEncodedRecord(record: any, registry: any, schema_id: any) { - let encodedRecord = await registry.encode(schema_id, record); - return encodedRecord; -} diff --git a/src/utils/end.ts b/src/utils/end.ts index 54d5c69..8926259 100644 --- a/src/utils/end.ts +++ b/src/utils/end.ts @@ -1,4 +1,3 @@ -import dotenv from 'dotenv'; import alert from 'cli-alerts'; export default async function end(): Promise { diff --git a/src/utils/env.ts b/src/utils/env.ts new file mode 100644 index 0000000..9fd274b --- /dev/null +++ b/src/utils/env.ts @@ -0,0 +1,25 @@ +import dotenv from 'dotenv'; + +export class Env { + + static { + dotenv.config(); + } + + static required(name: string): string { + const value = process.env[name]; + if (!value) { + alert({ + type: `error`, + name: `Missing required environment variable ${name}` + }); + process.exit(1); + } + + return value; + } + + static optional(name: string, orElse: string): string { + return process.env[name] ?? orElse; + } +} \ No newline at end of file diff --git a/tests/datagen.test.ts b/tests/datagen.test.ts index e475bab..4a97d54 100644 --- a/tests/datagen.test.ts +++ b/tests/datagen.test.ts @@ -1,4 +1,3 @@ -import { Command } from 'commander'; import { execSync } from 'child_process'; const datagen = args => { @@ -17,7 +16,6 @@ describe('Schema Parsing Tests', () => { const schema = './tests/schema.avsc'; const output = datagen(`-s ${schema} -n 2`); expect(output).toContain('Parsing Avro schema...'); - expect(output).toContain('Dry run: Skipping topic creation...'); expect(output).toContain('Dry run: Skipping record production...'); expect(output).toContain('Stopping the data generator'); }); @@ -25,7 +23,6 @@ describe('Schema Parsing Tests', () => { const schema = './tests/products.sql'; const output = datagen(`-s ${schema} -n 2`); expect(output).toContain('Parsing schema...'); - expect(output).toContain('Dry run: Skipping topic creation...'); expect(output).toContain('Dry run: Skipping record production...'); expect(output).toContain('Stopping the data generator'); }); @@ -33,7 +30,6 @@ describe('Schema Parsing Tests', () => { const schema = './tests/schema.json'; const output = datagen(`-s ${schema} -n 2`); expect(output).toContain('Parsing JSON schema...'); - expect(output).toContain('Dry run: Skipping topic creation...'); expect(output).toContain('Dry run: Skipping record production...'); expect(output).toContain('Stopping the data generator'); });