Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance Datagen with Protobuf Support and Nested Key References #112

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Datagen CLI

This command line interface application allows you to take schemas defined in JSON (`.json`), Avro (`.avsc`), or SQL (`.sql`) and produce believable fake data to Kafka in JSON or Avro format or to Postgres.
This command line interface application allows you to take schemas defined in JSON (`.json`), Avro (`.avsc`), or SQL (`.sql`) and produce believable fake data to Kafka in JSON or Protobuf format or Avro format or to Postgres.

The benefits of using this datagen tool are:
- You can specify what values are generated using the expansive [FakerJS API](https://fakerjs.dev/api/) to craft data that more faithfully imitates your use case. This allows you to more easily apply business logic downstream.
Expand Down Expand Up @@ -85,7 +85,7 @@ Fake Data Generator
Options:
-V, --version output the version number
-s, --schema <char> Schema file to use
-f, --format <char> The format of the produced data (choices: "json", "avro", "postgres", "webhook", default: "json")
-f, --format <char> The format of the produced data (choices: "json", "avro", "postgres", "webhook", "proto", default: "json")
-n, --number <char> Number of records to generate. For infinite records, use -1 (default: "10")
-c, --clean Clean (delete) Kafka topics and schema subjects previously created
-dr, --dry-run Dry run (no data will be produced to Kafka)
Expand Down Expand Up @@ -213,7 +213,11 @@ Here is the general syntax for a JSON input schema:
{
"_meta": {
"topic": "<my kafka topic>",
"key": "<field to be used for kafka record key>" ,
"key": "<field to be used for kafka record key>" ,
"proto": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s ok to add an example that uses protobuf output format, but we shouldn’t make breaking changes to what is required for the input json schema.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am updating the README to add a separate section called:

JSON Schema with Protobuf.

The proto {} meta field is intended to be optional.

"dir": "<directory with protobuf schemas>",
"schema": "<protobfuf message schema name>"
},
"relationships": [
{
"topic": "<topic for dependent dataset>",
Expand Down
2 changes: 1 addition & 1 deletion datagen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ program
.requiredOption('-s, --schema <char>', 'Schema file to use')
.addOption(
new Option('-f, --format <char>', 'The format of the produced data')
.choices(['json', 'avro', 'postgres', 'webhook'])
.choices(['json', 'avro', 'postgres', 'webhook', 'proto'])
.default('json')
)
.addOption(
Expand Down
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
},
"dependencies": {
"@avro/types": "^1.0.25",
"@faker-js/faker": "^7.6.0",
"@faker-js/faker": "^8.0.0",
"@kafkajs/confluent-schema-registry": "^3.3.0",
"@types/node": "^18.14.6",
"arg": "^5.0.2",
Expand All @@ -53,7 +53,9 @@
"dotenv": "^16.0.2",
"kafkajs": "^2.2.3",
"node-sql-parser": "^4.6.1",
"pg": "^8.11.0"
"pg": "^8.11.0",
"protobufjs": "^6.11.4",
"glob": "10.3.10"
},
"devDependencies": {
"@types/jest": "^29.4.0",
Expand Down
87 changes: 87 additions & 0 deletions src/formats/protoFormat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { OutputFormat } from "./outputFormat";
import protobuf from "protobufjs";
import alert from "cli-alerts";
import { globSync } from "glob";

export class ProtoFormat implements OutputFormat {
private schemas: any = {};
private schemaFiles: Set<string>;

static async getProtoSchemas(megaRecord: any, protoSchemaFiles: string[]) {

if (!protoSchemaFiles || protoSchemaFiles.length === 0) {
protoSchemaFiles = [];
protoSchemaFiles.push(...(await ProtoFormat.getProtoSchemaFiles(megaRecord)));
}

const protoSchemas = {};
const protoRoot = protobuf.loadSync(protoSchemaFiles);
for (const topic in megaRecord) {

const protoSchema = {};
try {
protoSchema["messageType"] = protoRoot.lookupType(megaRecord[topic].schema);
protoSchema["name"] = topic
protoSchema["namespace"] = megaRecord[topic].schema

if (global.debug) {
alert({
type: `success`,
name: `Proto Schema for topic ${topic}:`,
msg: `\n ${JSON.stringify(protoSchema, null, 2)}`
});
}

protoSchemas[topic] = protoSchema;
} catch (error) {
alert({
type: `error`,
name: `protobuf lookup type error for schema ${megaRecord[topic].schema}`,
msg: `${error}`
});
process.exit(1);

}
}

return protoSchemas;
}

static async getProtoSchemaFiles(megaRecord: any) {
const protoFiles = new Set<string>();
for (const topic in megaRecord) {
(await ProtoFormat.getProtoSchemaFilesSync(megaRecord[topic].schemaDir)).forEach(file => protoFiles.add(file));
}
return protoFiles;
}

static async getProtoSchemaFilesSync(directory: string) {
if (!directory) {
return [];
}
return globSync(directory + (directory.endsWith("/") ? "" : "/") + "**/*.proto");
}

async register(megaRecord: any): Promise<void> {
this.schemaFiles = await ProtoFormat.getProtoSchemaFiles(megaRecord);
this.schemas = await ProtoFormat.getProtoSchemas(megaRecord, Array.from(this.schemaFiles));
}
Comment on lines +65 to +68
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for that PR, it looks great!

One thing that I believe might be missing is that the schemas do not get registered in the CSR, so not 100% sure how consumers will decode the Protobuf messages.

Copy link
Author

@recursethis recursethis Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do not need a CSR to use protos with Kafka. Often, consumers will have have their own references to the schemas. The datagen tool working with protos should be independent on how consumers get the protos. The datagen tool is about message production. How consumers decrypt the messages they have the option of

  1. having the .proto files or the generated classes from the .protos
  2. CSR.

Is (2) a requirement for this PR?


async encode(record: any, topic: string): Promise<Buffer> {
const messageType = this.schemas[topic]['messageType'];

// check if the message is valid
const error = messageType.verify(record);
if (global.debug && error) {
alert({
type: `warning`,
name: `${record} with ${this.schemas[topic]['namespace']} is not valid`,
msg: `${error}`
});
}
// if the message is not valid, convert plain object
const message = error ? messageType.fromObject(record) : messageType.create(record);

return messageType.encode(message).finish();
}
}
15 changes: 12 additions & 3 deletions src/kafkaDataGenerator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import { AvroFormat } from './formats/avroFormat.js';
import { JsonFormat } from './formats/jsonFormat.js';
import sleep from './utils/sleep.js';
import asyncGenerator from './utils/asyncGenerator.js';
import { accessRecordKey } from './utils/recordKey.js';
import { ProtoFormat } from "./formats/protoFormat.js";


export default async function kafkaDataGenerator({
format,
Expand All @@ -26,6 +29,8 @@ export default async function kafkaDataGenerator({
outputFormat = await AvroFormat.create();
} else if (format === 'json') {
outputFormat = new JsonFormat();
} else if (format === 'proto') {
outputFormat = new ProtoFormat();
}

producer = await KafkaProducer.create(outputFormat);
Expand All @@ -37,14 +42,18 @@ export default async function kafkaDataGenerator({

if (iteration === 0) {
await producer?.prepare(megaRecord);
if (global.debug && global.dryRun && format === 'avro') {
await AvroFormat.getAvroSchemas(megaRecord);
if (global.debug && global.dryRun) {
if (format === 'avro') {
await AvroFormat.getAvroSchemas(megaRecord);
} else if(format === 'proto') {
await ProtoFormat.getProtoSchemas(megaRecord, []);
}
}
}

for (const topic in megaRecord) {
for await (const record of megaRecord[topic].records) {
let key = null;
let key = accessRecordKey(megaRecord[topic].key, record)
if (record[megaRecord[topic].key]) {
key = record[megaRecord[topic].key];
}
Expand Down
6 changes: 6 additions & 0 deletions src/schemas/generateMegaRecord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ export async function generateMegaRecord(schema: any) {
});
}

// specify the proto field for the topic
if ("proto" in _meta) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be invalid if the user doesn’t specify proto output format. In other words, if they have proto in _meta but are using -f avro for output, then _meta.proto should be ignored.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this is that I would have to pass in the format into the generateMegaRecord method to skip the if("proto" in _meta) check.

the _meta.proto is effectively ignored already by all output formats, including Avro, because _meta.proto is only referenced in the protoFormat.ts

megaRecord[topic]["schemaDir"] = _meta.proto.dir;
megaRecord[topic]["schema"] = _meta.proto.schema;
}

// for records that already exist, generate values
// for every field that doesn't already have a value.
megaRecord[topic]["key"] = _meta.key
Expand Down
3 changes: 3 additions & 0 deletions src/utils/recordKey.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function accessRecordKey(path: string, record: any): any {
return path?.split('.').reduce((level, key) => level && level[key], record);
}
25 changes: 25 additions & 0 deletions tests/datagen.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ describe('Schema Parsing Tests', () => {
expect(output).toContain('Dry run: Skipping record production...');
expect(output).toContain('Stopping the data generator');
});
test('should parse json schema with proto definitions', () => {
const schema = './tests/schema.json';
const output = datagen(`-s ${schema} -n 2 -f proto`);
expect(output).toContain('Parsing JSON schema...');
expect(output).toContain('Dry run: Skipping record production...');
expect(output).toContain('Stopping the data generator');
});
});


Expand All @@ -54,6 +61,24 @@ describe('Test missing schema file', () => {
expect(error.status).toBe(1);
}
});
test('should return error if proto schema is not defined', () => {
const schema = './tests/iterationIndex.json'
try {
const output = datagen(`-s ${schema} -n 2 -f proto`);
} catch (error) {
expect(error.stdout.toString()).toContain(`Error: no such type`);
expect(error.status).toBe(1);
}
});
test('should return error if proto schema is not defined', () => {
const schema = './tests/iterationIndex.json'
try {
const output = datagen(`-s ${schema} -n 2 -f proto`);
} catch (error) {
expect(error.stdout.toString()).toContain(`Error: no such type`);
expect(error.status).toBe(1);
}
});
});

describe('Test record size', () => {
Expand Down
6 changes: 5 additions & 1 deletion tests/iterationIndex.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
{
"_meta": {
"topic": "air_quality",
"key": "id"
"key": "id",
"proto": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let’s not change existing tests, but rather add a new test

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am adding a interationIndex-proto.json file to use instead

"dir": "./tests",
"schema": "datagen.dne"
}
},
"id": "iteration.index",
"timestamp": "faker.date.between('2020-01-01T00:00:00.000Z', '2030-01-01T00:00:00.000Z')",
Expand Down
12 changes: 12 additions & 0 deletions tests/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
"_meta": {
"topic": "mz_datagen_users",
"key": "id",
"proto": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment. Let’s not change existing tests to require proto

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am adding a schema-proto.json file to use instead

"dir": "./tests",
"schema": "datagen.User"
},
"relationships": [
{
"topic": "mz_datagen_posts",
Expand All @@ -28,6 +32,10 @@
"_meta": {
"topic": "mz_datagen_posts",
"key": "id",
"proto": {
"dir": "./tests",
"schema": "datagen.Post"
},
"relationships": [
{
"topic": "mz_datagen_comments",
Expand All @@ -46,6 +54,10 @@
"_meta": {
"topic": "mz_datagen_comments",
"key": "id",
"proto": {
"dir": "./tests",
"schema": "datagen.Comment"
},
"relationships": [
{
"topic": "mz_datagen_users",
Expand Down
38 changes: 38 additions & 0 deletions tests/schema.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
syntax = "proto3";

package datagen;

// Definition for the User data
message User {
message Nested {
string phone = 1;
string website = 2;
}

int32 id = 1; // Assuming IDs are integers
string name = 2;
string email = 3;
string phone = 4;
string website = 5;
string city = 6;
string company = 7;
Nested nested = 8; // Nested message for phone and website
}

// Definition for the Post data
message Post {
int32 id = 1;
int32 user_id = 2; // Assuming this is a reference to a User ID
string title = 3;
string body = 4;
}

// Definition for the Comment data
message Comment {
int32 id = 1;
int32 user_id = 2; // Assuming this is a reference to a User ID
int32 post_id = 3; // Assuming this is a reference to a Post ID
string body = 4;
int32 views = 5; // Assuming views is an integer
int32 status = 6; // Assuming status is an integer representing some enum
}
Loading