-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Co-authored-by: Chuck Larrieu Casias <[email protected]>
- Loading branch information
1 parent
20cbc6a
commit 639dcae
Showing
16 changed files
with
223 additions
and
280 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
import { SchemaRegistry, SchemaType } from "@kafkajs/confluent-schema-registry"; | ||
import { Env } from "../utils/env.js"; | ||
import { OutputFormat } from "./outputFormat"; | ||
|
||
export class AvroFormat implements OutputFormat { | ||
private schemas: any = {}; | ||
private registry: SchemaRegistry; | ||
|
||
static async create(): Promise<AvroFormat> { | ||
const url = Env.required("SCHEMA_REGISTRY_URL"); | ||
const username = Env.optional("SCHEMA_REGISTRY_USERNAME", null); | ||
const password = Env.optional("SCHEMA_REGISTRY_PASSWORD", null); | ||
|
||
const configuration = { | ||
host: url | ||
}; | ||
|
||
if (password && username) { | ||
configuration["auth"] = { | ||
username: username, | ||
password: password | ||
}; | ||
} | ||
|
||
const registry = new SchemaRegistry(configuration); | ||
return new AvroFormat(registry); | ||
} | ||
|
||
constructor(registry: SchemaRegistry) { | ||
this.registry = registry; | ||
} | ||
|
||
async register(schema: any, topic: string): Promise<void> { | ||
const options = { subject: `${schema["name"]}-value` } | ||
try { | ||
const resp = await this.registry.register({ | ||
type: SchemaType.AVRO, | ||
schema: JSON.stringify(schema) | ||
}, | ||
options | ||
) | ||
|
||
alert({ | ||
type: `success`, | ||
name: `Schema registered!`, | ||
msg: `Subject: ${options.subject}, ID: ${resp.id}` | ||
}); | ||
|
||
this.schemas[topic] = { | ||
'schemaId': resp.id, | ||
'schema': schema | ||
}; | ||
} catch (error) { | ||
alert({ | ||
type: `error`, | ||
name: `Failed to register schema.`, | ||
msg: `${error}` | ||
}); | ||
|
||
process.exit(1); | ||
} | ||
} | ||
|
||
async encode(record: any, topic: string): Promise<Buffer> { | ||
const schemaId = this.schemas[topic]['schemaId'] | ||
const encodedRecord = await this.registry.encode(schemaId, record); | ||
return encodedRecord; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
import { OutputFormat } from "./outputFormat"; | ||
|
||
export class JsonFormat implements OutputFormat { | ||
|
||
register(schema: any, topic: string): Promise<void> { | ||
return Promise.resolve(); | ||
} | ||
|
||
encode(record: any, _: string): Promise<Buffer> { | ||
const value = JSON.stringify(record); | ||
return Promise.resolve(Buffer.from(value)); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
export interface OutputFormat { | ||
register(schema: any, topic: string): Promise<void>; | ||
|
||
encode(record: any, topic: string): Promise<Buffer>; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.