diff --git a/README.md b/README.md index 5ed387f..1400aa7 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Datagen CLI -This command line interface application allows you to take schemas defined in JSON (`.json`), Avro (`.avsc`), or SQL (`.sql`) and produce believable fake data to Kafka in JSON or Avro format or to Postgres. +This command line interface application allows you to take schemas defined in JSON (`.json`), Avro (`.avsc`), or SQL (`.sql`) and produce believable fake data to Kafka in JSON or Protobuf format or Avro format or to Postgres. The benefits of using this datagen tool are: - You can specify what values are generated using the expansive [FakerJS API](https://fakerjs.dev/api/) to craft data that more faithfully imitates your use case. This allows you to more easily apply business logic downstream. @@ -92,7 +92,7 @@ Fake Data Generator Options: -V, --version output the version number -s, --schema Schema file to use - -f, --format The format of the produced data (choices: "json", "avro", "postgres", "webhook", "mysql", default: "json") + -f, --format The format of the produced data (choices: "json", "avro", "postgres", "webhook", "mysql", "proto", default: "json") -n, --number Number of records to generate. For infinite records, use -1 (default: "10") -c, --clean Clean (delete) Kafka topics and schema subjects previously created -dr, --dry-run Dry run (no data will be produced to Kafka) @@ -244,6 +244,45 @@ Here is the general syntax for a JSON input schema: Go to the [end-to-end ecommerce tutorial](./examples/ecommerce) to walk through an example that uses a JSON input schema with relational data. +### JSON Schema with Protobuf +You can use your JSON Schema with Protobuf by first creating a `.proto` schema with your `` definitions: +```json +syntax = "proto3"; + +package ; + +message { + = 1; + = 2; + ... +} +``` + +You then, in your JSON input schema, reference the `` and the directory which the `.proto` file(s) reside: +```json +[ + { + "_meta": { + "topic": "", + "key": "" , + "proto": { + "dir": "", + "schema": "" + } + }, + "": "", + "": "", + ... + } +``` +#### Producing Protobuf messages +Once you have a JSON input schema with the appropriate protobuf schema references, then you can produce messages by setting `-f proto` or `--format proto` in the command: +```bash +datagen \ + -s tests/schema-proto.json \ + -f proto \ + -n 1000 +``` ### SQL Schema diff --git a/datagen.ts b/datagen.ts index f11c802..71e6677 100755 --- a/datagen.ts +++ b/datagen.ts @@ -23,7 +23,7 @@ program .requiredOption('-s, --schema ', 'Schema file to use') .addOption( new Option('-f, --format ', 'The format of the produced data') - .choices(['json', 'avro', 'postgres', 'webhook', 'mysql']) + .choices(['json', 'avro', 'postgres', 'webhook', 'mysql', 'proto']) .default('json') ) .addOption( diff --git a/package.json b/package.json index 80b9acc..bf05e15 100644 --- a/package.json +++ b/package.json @@ -54,7 +54,9 @@ "kafkajs": "^2.2.3", "mysql2": "^3.9.2", "node-sql-parser": "^4.6.1", - "pg": "^8.11.0" + "pg": "^8.11.0", + "protobufjs": "^6.11.4", + "glob": "10.3.10" }, "devDependencies": { "@types/jest": "^29.4.0", diff --git a/src/formats/protoFormat.ts b/src/formats/protoFormat.ts new file mode 100644 index 0000000..6361545 --- /dev/null +++ b/src/formats/protoFormat.ts @@ -0,0 +1,87 @@ +import { OutputFormat } from "./outputFormat"; +import protobuf from "protobufjs"; +import alert from "cli-alerts"; +import { globSync } from "glob"; + +export class ProtoFormat implements OutputFormat { + private schemas: any = {}; + private schemaFiles: Set; + + static async getProtoSchemas(megaRecord: any, protoSchemaFiles: string[]) { + + if (!protoSchemaFiles || protoSchemaFiles.length === 0) { + protoSchemaFiles = []; + protoSchemaFiles.push(...(await ProtoFormat.getProtoSchemaFiles(megaRecord))); + } + + const protoSchemas = {}; + const protoRoot = protobuf.loadSync(protoSchemaFiles); + for (const topic in megaRecord) { + + const protoSchema = {}; + try { + protoSchema["messageType"] = protoRoot.lookupType(megaRecord[topic].schema); + protoSchema["name"] = topic + protoSchema["namespace"] = megaRecord[topic].schema + + if (global.debug) { + alert({ + type: `success`, + name: `Proto Schema for topic ${topic}:`, + msg: `\n ${JSON.stringify(protoSchema, null, 2)}` + }); + } + + protoSchemas[topic] = protoSchema; + } catch (error) { + alert({ + type: `error`, + name: `protobuf lookup type error for schema ${megaRecord[topic].schema}`, + msg: `${error}` + }); + process.exit(1); + + } + } + + return protoSchemas; + } + + static async getProtoSchemaFiles(megaRecord: any) { + const protoFiles = new Set(); + for (const topic in megaRecord) { + (await ProtoFormat.getProtoSchemaFilesSync(megaRecord[topic].schemaDir)).forEach(file => protoFiles.add(file)); + } + return protoFiles; + } + + static async getProtoSchemaFilesSync(directory: string) { + if (!directory) { + return []; + } + return globSync(directory + (directory.endsWith("/") ? "" : "/") + "**/*.proto"); + } + + async register(megaRecord: any): Promise { + this.schemaFiles = await ProtoFormat.getProtoSchemaFiles(megaRecord); + this.schemas = await ProtoFormat.getProtoSchemas(megaRecord, Array.from(this.schemaFiles)); + } + + async encode(record: any, topic: string): Promise { + const messageType = this.schemas[topic]['messageType']; + + // check if the message is valid + const error = messageType.verify(record); + if (global.debug && error) { + alert({ + type: `warning`, + name: `${record} with ${this.schemas[topic]['namespace']} is not valid`, + msg: `${error}` + }); + } + // if the message is not valid, convert plain object + const message = error ? messageType.fromObject(record) : messageType.create(record); + + return messageType.encode(message).finish(); + } +} \ No newline at end of file diff --git a/src/kafkaDataGenerator.ts b/src/kafkaDataGenerator.ts index 316d4c9..975189f 100644 --- a/src/kafkaDataGenerator.ts +++ b/src/kafkaDataGenerator.ts @@ -6,6 +6,9 @@ import { AvroFormat } from './formats/avroFormat.js'; import { JsonFormat } from './formats/jsonFormat.js'; import sleep from './utils/sleep.js'; import asyncGenerator from './utils/asyncGenerator.js'; +import { accessRecordKey } from './utils/recordKey.js'; +import { ProtoFormat } from "./formats/protoFormat.js"; + export default async function kafkaDataGenerator({ format, @@ -26,6 +29,8 @@ export default async function kafkaDataGenerator({ outputFormat = await AvroFormat.create(); } else if (format === 'json') { outputFormat = new JsonFormat(); + } else if (format === 'proto') { + outputFormat = new ProtoFormat(); } producer = await KafkaProducer.create(outputFormat); @@ -37,14 +42,18 @@ export default async function kafkaDataGenerator({ if (iteration === 0) { await producer?.prepare(megaRecord); - if (global.debug && global.dryRun && format === 'avro') { - await AvroFormat.getAvroSchemas(megaRecord); + if (global.debug && global.dryRun) { + if (format === 'avro') { + await AvroFormat.getAvroSchemas(megaRecord); + } else if(format === 'proto') { + await ProtoFormat.getProtoSchemas(megaRecord, []); + } } } for (const topic in megaRecord) { for await (const record of megaRecord[topic].records) { - let key = null; + let key = accessRecordKey(megaRecord[topic].key, record) if (record[megaRecord[topic].key]) { key = record[megaRecord[topic].key]; } diff --git a/src/schemas/generateMegaRecord.ts b/src/schemas/generateMegaRecord.ts index 9f6a193..1a62e50 100644 --- a/src/schemas/generateMegaRecord.ts +++ b/src/schemas/generateMegaRecord.ts @@ -82,6 +82,12 @@ export async function generateMegaRecord(schema: any) { }); } + // specify the proto field for the topic + if ("proto" in _meta) { + megaRecord[topic]["schemaDir"] = _meta.proto.dir; + megaRecord[topic]["schema"] = _meta.proto.schema; + } + // for records that already exist, generate values // for every field that doesn't already have a value. megaRecord[topic]["key"] = _meta.key diff --git a/src/utils/recordKey.ts b/src/utils/recordKey.ts new file mode 100644 index 0000000..d883d9d --- /dev/null +++ b/src/utils/recordKey.ts @@ -0,0 +1,3 @@ +export function accessRecordKey(path: string, record: any): any { + return path?.split('.').reduce((level, key) => level && level[key], record); +} \ No newline at end of file diff --git a/tests/datagen.test.ts b/tests/datagen.test.ts index e989bb3..4076a6e 100644 --- a/tests/datagen.test.ts +++ b/tests/datagen.test.ts @@ -41,6 +41,13 @@ describe('Schema Parsing Tests', () => { expect(output).toContain('Dry run: Skipping record production...'); expect(output).toContain('Stopping the data generator'); }); + test('should parse json schema with proto definitions', () => { + const schema = './tests/schema-proto.json'; + const output = datagen(`-s ${schema} -n 2 -f proto`); + expect(output).toContain('Parsing JSON schema...'); + expect(output).toContain('Dry run: Skipping record production...'); + expect(output).toContain('Stopping the data generator'); + }); }); @@ -54,6 +61,15 @@ describe('Test missing schema file', () => { expect(error.status).toBe(1); } }); + test('should return error if proto schema is not defined', () => { + const schema = './tests/iterationIndex-proto.json' + try { + const output = datagen(`-s ${schema} -n 2 -f proto`); + } catch (error) { + expect(error.stdout.toString()).toContain(`Error: no such type`); + expect(error.status).toBe(1); + } + }); }); describe('Test record size', () => { diff --git a/tests/iterationIndex-proto.json b/tests/iterationIndex-proto.json new file mode 100644 index 0000000..5cbf896 --- /dev/null +++ b/tests/iterationIndex-proto.json @@ -0,0 +1,20 @@ +{ + "_meta": { + "topic": "air_quality", + "key": "id", + "proto": { + "dir": "./tests", + "schema": "datagen.dne" + } + }, + "id": "iteration.index", + "timestamp": "faker.date.between('2020-01-01T00:00:00.000Z', '2030-01-01T00:00:00.000Z')", + "location": { + "latitude": "faker.datatype.number({ max: 90, min: -90})", + "longitude": "faker.datatype.number({ max: 180, min: -180})" + }, + "pm25": "faker.datatype.float({ min: 10, max: 90 })", + "pm10": "faker.datatype.float({ min: 10, max: 90 })", + "temperature": "faker.datatype.float({ min: -10, max: 120 })", + "humidity": "faker.datatype.float({ min: 0, max: 100 })" +} diff --git a/tests/schema-proto.json b/tests/schema-proto.json new file mode 100644 index 0000000..0d42d10 --- /dev/null +++ b/tests/schema-proto.json @@ -0,0 +1,77 @@ +[ + { + "_meta": { + "topic": "mz_datagen_users", + "key": "id", + "proto": { + "dir": "./tests", + "schema": "datagen.User" + }, + "relationships": [ + { + "topic": "mz_datagen_posts", + "parent_field": "id", + "child_field": "user_id", + "records_per": 2 + } + ] + }, + "nested": { + "phone": "faker.phone.imei()", + "website": "faker.internet.domainName()" + }, + "id": "faker.datatype.number(100)", + "name": "faker.internet.userName()", + "email": "faker.internet.exampleEmail()", + "phone": "faker.phone.imei()", + "website": "faker.internet.domainName()", + "city": "faker.address.city()", + "company": "faker.company.name()" + }, + { + "_meta": { + "topic": "mz_datagen_posts", + "key": "id", + "proto": { + "dir": "./tests", + "schema": "datagen.Post" + }, + "relationships": [ + { + "topic": "mz_datagen_comments", + "parent_field": "id", + "child_field": "post_id", + "records_per": 2 + } + ] + }, + "id": "faker.datatype.number(1000)", + "user_id": "faker.datatype.number(100)", + "title": "faker.lorem.sentence()", + "body": "faker.lorem.paragraph()" + }, + { + "_meta": { + "topic": "mz_datagen_comments", + "key": "id", + "proto": { + "dir": "./tests", + "schema": "datagen.Comment" + }, + "relationships": [ + { + "topic": "mz_datagen_users", + "parent_field": "user_id", + "child_field": "id", + "records_per": 1 + } + ] + }, + "id": "faker.datatype.number(2000)", + "user_id": "faker.datatype.number(100)", + "body": "faker.lorem.paragraph()", + "post_id": "faker.datatype.number(1000)", + "views": "faker.datatype.number({min: 100, max: 1000})", + "status": "faker.datatype.number(1)" + } +] diff --git a/tests/schema.proto b/tests/schema.proto new file mode 100644 index 0000000..215315f --- /dev/null +++ b/tests/schema.proto @@ -0,0 +1,38 @@ +syntax = "proto3"; + +package datagen; + +// Definition for the User data +message User { + message Nested { + string phone = 1; + string website = 2; + } + + int32 id = 1; // Assuming IDs are integers + string name = 2; + string email = 3; + string phone = 4; + string website = 5; + string city = 6; + string company = 7; + Nested nested = 8; // Nested message for phone and website +} + +// Definition for the Post data +message Post { + int32 id = 1; + int32 user_id = 2; // Assuming this is a reference to a User ID + string title = 3; + string body = 4; +} + +// Definition for the Comment data +message Comment { + int32 id = 1; + int32 user_id = 2; // Assuming this is a reference to a User ID + int32 post_id = 3; // Assuming this is a reference to a Post ID + string body = 4; + int32 views = 5; // Assuming views is an integer + int32 status = 6; // Assuming status is an integer representing some enum +} \ No newline at end of file