-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhance Datagen with Protobuf Support and Nested Key References #112
base: main
Are you sure you want to change the base?
Changes from all commits
fc745eb
d4ce4f5
e285f22
a81f28d
06cfd1c
0028bc9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
import { OutputFormat } from "./outputFormat"; | ||
import protobuf from "protobufjs"; | ||
import alert from "cli-alerts"; | ||
import { globSync } from "glob"; | ||
|
||
export class ProtoFormat implements OutputFormat { | ||
private schemas: any = {}; | ||
private schemaFiles: Set<string>; | ||
|
||
static async getProtoSchemas(megaRecord: any, protoSchemaFiles: string[]) { | ||
|
||
if (!protoSchemaFiles || protoSchemaFiles.length === 0) { | ||
protoSchemaFiles = []; | ||
protoSchemaFiles.push(...(await ProtoFormat.getProtoSchemaFiles(megaRecord))); | ||
} | ||
|
||
const protoSchemas = {}; | ||
const protoRoot = protobuf.loadSync(protoSchemaFiles); | ||
for (const topic in megaRecord) { | ||
|
||
const protoSchema = {}; | ||
try { | ||
protoSchema["messageType"] = protoRoot.lookupType(megaRecord[topic].schema); | ||
protoSchema["name"] = topic | ||
protoSchema["namespace"] = megaRecord[topic].schema | ||
|
||
if (global.debug) { | ||
alert({ | ||
type: `success`, | ||
name: `Proto Schema for topic ${topic}:`, | ||
msg: `\n ${JSON.stringify(protoSchema, null, 2)}` | ||
}); | ||
} | ||
|
||
protoSchemas[topic] = protoSchema; | ||
} catch (error) { | ||
alert({ | ||
type: `error`, | ||
name: `protobuf lookup type error for schema ${megaRecord[topic].schema}`, | ||
msg: `${error}` | ||
}); | ||
process.exit(1); | ||
|
||
} | ||
} | ||
|
||
return protoSchemas; | ||
} | ||
|
||
static async getProtoSchemaFiles(megaRecord: any) { | ||
const protoFiles = new Set<string>(); | ||
for (const topic in megaRecord) { | ||
(await ProtoFormat.getProtoSchemaFilesSync(megaRecord[topic].schemaDir)).forEach(file => protoFiles.add(file)); | ||
} | ||
return protoFiles; | ||
} | ||
|
||
static async getProtoSchemaFilesSync(directory: string) { | ||
if (!directory) { | ||
return []; | ||
} | ||
return globSync(directory + (directory.endsWith("/") ? "" : "/") + "**/*.proto"); | ||
} | ||
|
||
async register(megaRecord: any): Promise<void> { | ||
this.schemaFiles = await ProtoFormat.getProtoSchemaFiles(megaRecord); | ||
this.schemas = await ProtoFormat.getProtoSchemas(megaRecord, Array.from(this.schemaFiles)); | ||
} | ||
|
||
async encode(record: any, topic: string): Promise<Buffer> { | ||
const messageType = this.schemas[topic]['messageType']; | ||
|
||
// check if the message is valid | ||
const error = messageType.verify(record); | ||
if (global.debug && error) { | ||
alert({ | ||
type: `warning`, | ||
name: `${record} with ${this.schemas[topic]['namespace']} is not valid`, | ||
msg: `${error}` | ||
}); | ||
} | ||
// if the message is not valid, convert plain object | ||
const message = error ? messageType.fromObject(record) : messageType.create(record); | ||
|
||
return messageType.encode(message).finish(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -82,6 +82,12 @@ export async function generateMegaRecord(schema: any) { | |
}); | ||
} | ||
|
||
// specify the proto field for the topic | ||
if ("proto" in _meta) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be invalid if the user doesn’t specify proto output format. In other words, if they have proto in _meta but are using -f avro for output, then _meta.proto should be ignored. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem with this is that I would have to pass in the the |
||
megaRecord[topic]["schemaDir"] = _meta.proto.dir; | ||
megaRecord[topic]["schema"] = _meta.proto.schema; | ||
} | ||
|
||
// for records that already exist, generate values | ||
// for every field that doesn't already have a value. | ||
megaRecord[topic]["key"] = _meta.key | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
export function accessRecordKey(path: string, record: any): any { | ||
return path?.split('.').reduce((level, key) => level && level[key], record); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
{ | ||
"_meta": { | ||
"topic": "air_quality", | ||
"key": "id", | ||
"proto": { | ||
"dir": "./tests", | ||
"schema": "datagen.dne" | ||
} | ||
}, | ||
"id": "iteration.index", | ||
"timestamp": "faker.date.between('2020-01-01T00:00:00.000Z', '2030-01-01T00:00:00.000Z')", | ||
"location": { | ||
"latitude": "faker.datatype.number({ max: 90, min: -90})", | ||
"longitude": "faker.datatype.number({ max: 180, min: -180})" | ||
}, | ||
"pm25": "faker.datatype.float({ min: 10, max: 90 })", | ||
"pm10": "faker.datatype.float({ min: 10, max: 90 })", | ||
"temperature": "faker.datatype.float({ min: -10, max: 120 })", | ||
"humidity": "faker.datatype.float({ min: 0, max: 100 })" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
[ | ||
{ | ||
"_meta": { | ||
"topic": "mz_datagen_users", | ||
"key": "id", | ||
"proto": { | ||
"dir": "./tests", | ||
"schema": "datagen.User" | ||
}, | ||
"relationships": [ | ||
{ | ||
"topic": "mz_datagen_posts", | ||
"parent_field": "id", | ||
"child_field": "user_id", | ||
"records_per": 2 | ||
} | ||
] | ||
}, | ||
"nested": { | ||
"phone": "faker.phone.imei()", | ||
"website": "faker.internet.domainName()" | ||
}, | ||
"id": "faker.datatype.number(100)", | ||
"name": "faker.internet.userName()", | ||
"email": "faker.internet.exampleEmail()", | ||
"phone": "faker.phone.imei()", | ||
"website": "faker.internet.domainName()", | ||
"city": "faker.address.city()", | ||
"company": "faker.company.name()" | ||
}, | ||
{ | ||
"_meta": { | ||
"topic": "mz_datagen_posts", | ||
"key": "id", | ||
"proto": { | ||
"dir": "./tests", | ||
"schema": "datagen.Post" | ||
}, | ||
"relationships": [ | ||
{ | ||
"topic": "mz_datagen_comments", | ||
"parent_field": "id", | ||
"child_field": "post_id", | ||
"records_per": 2 | ||
} | ||
] | ||
}, | ||
"id": "faker.datatype.number(1000)", | ||
"user_id": "faker.datatype.number(100)", | ||
"title": "faker.lorem.sentence()", | ||
"body": "faker.lorem.paragraph()" | ||
}, | ||
{ | ||
"_meta": { | ||
"topic": "mz_datagen_comments", | ||
"key": "id", | ||
"proto": { | ||
"dir": "./tests", | ||
"schema": "datagen.Comment" | ||
}, | ||
"relationships": [ | ||
{ | ||
"topic": "mz_datagen_users", | ||
"parent_field": "user_id", | ||
"child_field": "id", | ||
"records_per": 1 | ||
} | ||
] | ||
}, | ||
"id": "faker.datatype.number(2000)", | ||
"user_id": "faker.datatype.number(100)", | ||
"body": "faker.lorem.paragraph()", | ||
"post_id": "faker.datatype.number(1000)", | ||
"views": "faker.datatype.number({min: 100, max: 1000})", | ||
"status": "faker.datatype.number(1)" | ||
} | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
syntax = "proto3"; | ||
|
||
package datagen; | ||
|
||
// Definition for the User data | ||
message User { | ||
message Nested { | ||
string phone = 1; | ||
string website = 2; | ||
} | ||
|
||
int32 id = 1; // Assuming IDs are integers | ||
string name = 2; | ||
string email = 3; | ||
string phone = 4; | ||
string website = 5; | ||
string city = 6; | ||
string company = 7; | ||
Nested nested = 8; // Nested message for phone and website | ||
} | ||
|
||
// Definition for the Post data | ||
message Post { | ||
int32 id = 1; | ||
int32 user_id = 2; // Assuming this is a reference to a User ID | ||
string title = 3; | ||
string body = 4; | ||
} | ||
|
||
// Definition for the Comment data | ||
message Comment { | ||
int32 id = 1; | ||
int32 user_id = 2; // Assuming this is a reference to a User ID | ||
int32 post_id = 3; // Assuming this is a reference to a Post ID | ||
string body = 4; | ||
int32 views = 5; // Assuming views is an integer | ||
int32 status = 6; // Assuming status is an integer representing some enum | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for that PR, it looks great!
One thing that I believe might be missing is that the schemas do not get registered in the CSR, so not 100% sure how consumers will decode the Protobuf messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You do not need a CSR to use protos with Kafka. Often, consumers will have have their own references to the schemas. The datagen tool working with protos should be independent on how consumers get the protos. The datagen tool is about message production. How consumers decrypt the messages they have the option of
Is (2) a requirement for this PR?