Skip to content

Commit

Permalink
YAR-14287: Refactor: resolve many lints (MaterializeInc#88)
Browse files Browse the repository at this point in the history
* Refactor: prefer const

* Refactor: Array type using 'Array<T>' is forbidden. Use 'T[]' instead

* Refactor: allow calls to console

* Refactor: triple equals

* Refactor: variable name clashes with keyword/type

* Refactor: non-arrow functions are forbidden

* Refactor: Expected property shorthand in object literal

* Refactor:  Missing radix parameter

* Refactor: shadow name

* Refactor: format code
  • Loading branch information
sjwiesman authored and Andre Rosa committed Feb 12, 2024
1 parent b5aab9c commit effb242
Show file tree
Hide file tree
Showing 12 changed files with 77 additions and 77 deletions.
8 changes: 4 additions & 4 deletions datagen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ program
new Option(
'-n, --number <int>',
'Number of records to generate. For infinite records, use -1'
).default('10').argParser((value) => parseInt(value))
).default('10').argParser((value) => parseInt(value, 10))
)
.option('-c, --clean', 'Clean (delete) Kafka topics and schema subjects previously created')
.option('-dr, --dry-run', 'Dry run (no data will be produced to Kafka)')
Expand Down Expand Up @@ -105,8 +105,8 @@ if (!global.wait) {
}

if (global.clean) {
let topics = []
for (let table of parsedSchema) {
const topics = []
for (const table of parsedSchema) {
topics.push(table._meta.topic)
}
await cleanKafka(options.format, topics)
Expand All @@ -117,7 +117,7 @@ if (!global.wait) {
await dataGenerator({
format: options.format,
schema: parsedSchema,
number: options.number
iterations: options.number
})

await end();
Expand Down
20 changes: 10 additions & 10 deletions src/dataGenerator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ import { OutputFormat } from './formats/outputFormat.js';
import { AvroFormat } from './formats/avroFormat.js';
import { JsonFormat } from './formats/jsonFormat.js';

async function* asyncGenerator(number: number) {
async function* asyncGenerator(iterations: number) {
let i = 0;
// If number is -1, generate infinite records
if (number === -1) {
if (iterations === -1) {
while (true) {
yield i;
i++;
}
} else {
for (i; i < number; i++) {
for (i; i < iterations; i++) {
yield i;
}
}
Expand All @@ -35,11 +35,11 @@ function sleep(s: number) {
export default async function dataGenerator({
format,
schema,
number
iterations
}: {
format: string;
schema: string;
number: number;
iterations: number;
}): Promise<void> {

let payload: string;
Expand All @@ -60,14 +60,14 @@ export default async function dataGenerator({
producer = await KafkaProducer.create(outputFormat);
}

for await (const iteration of asyncGenerator(number)) {
for await (const iteration of asyncGenerator(iterations)) {
global.iterationIndex = iteration;
let megaRecord = await generateMegaRecord(schema);
const megaRecord = await generateMegaRecord(schema);

if (iteration == 0) {
if (iteration === 0) {
await producer?.prepare(megaRecord);
if (global.debug && global.dryRun && format == 'avro') {
let avroSchemas = await AvroFormat.getAvroSchemas(megaRecord);
if (global.debug && global.dryRun && format === 'avro') {
await AvroFormat.getAvroSchemas(megaRecord);
}
}

Expand Down
28 changes: 14 additions & 14 deletions src/formats/avroFormat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ export class AvroFormat implements OutputFormat {

if (password && username) {
configuration["auth"] = {
username: username,
password: password
username,
password
};
}

Expand All @@ -32,10 +32,10 @@ export class AvroFormat implements OutputFormat {
constructor(registry: SchemaRegistry) {
this.registry = registry;
}

private static nameHook() {
let index = 0;
return function (schema, opts) {
return (schema, opts) => {
switch (schema.type) {
case 'enum':
case 'fixed':
Expand All @@ -46,16 +46,16 @@ export class AvroFormat implements OutputFormat {
}
};
}

// @ts-ignore
static async getAvroSchemas(megaRecord: any) {
let avroSchemas = {};
for (let topic in megaRecord) {
const avroSchemas = {};
for (const topic in megaRecord) {
// @ts-ignore
let avroSchema = Type.forValue(megaRecord[topic].records[0], { typeHook: this.nameHook() }).schema();
const avroSchema = Type.forValue(megaRecord[topic].records[0], { typeHook: this.nameHook() }).schema();
avroSchema["name"] = topic
avroSchema["namespace"] = "com.materialize"

if (global.debug) {
alert({
type: `success`,
Expand All @@ -72,22 +72,22 @@ export class AvroFormat implements OutputFormat {
async register(megaRecord: any): Promise<void> {
const avroSchemas = await AvroFormat.getAvroSchemas(megaRecord);
for (const topic in avroSchemas) {
let options = { subject: `${topic}-value` }
let avroSchema = avroSchemas[topic]
const options = { subject: `${topic}-value` }
const avroSchema = avroSchemas[topic]
try {
const resp = await this.registry.register({
type: SchemaType.AVRO,
schema: JSON.stringify(avroSchema)
},
options
)

alert({
type: `success`,
name: `Schema registered!`,
msg: `Subject: ${options.subject}, ID: ${resp.id}`
});

this.schemas[topic] = {
'schemaId': resp.id,
'schema': avroSchema
Expand All @@ -98,7 +98,7 @@ export class AvroFormat implements OutputFormat {
name: `Failed to register schema.`,
msg: `${error}`
});

process.exit(1);
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/kafka/cleanKafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ async function deleteSchemaSubjects(topics: any): Promise<void> {
const schemaRegistryUrl = Env.required("SCHEMA_REGISTRY_URL");

for await (const topic of topics) {
let url = `${schemaRegistryUrl}/subjects/${topic}-value?permanent=false`;
const url = `${schemaRegistryUrl}/subjects/${topic}-value?permanent=false`;
await axios.delete(
url,
{
Expand Down Expand Up @@ -50,15 +50,15 @@ export default async function cleanKafka(format: string, topics: any): Promise<v
await admin.connect();
try {
await admin.deleteTopics({
topics: topics
topics
})
console.log(`deleted Kafka topics ${topics}`)
} catch (error) {
console.log(error)
}
await admin.disconnect();

if (format != 'avro') {
if (format !== 'avro') {
console.log("Skipping Schema Registry")
} else {
await deleteSchemaSubjects(topics);
Expand Down
8 changes: 4 additions & 4 deletions src/kafka/createTopics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ export default async function createTopics(megaRecord: any): Promise<void> {
const admin = kafka.admin();
await admin.connect();
const topics = await admin.listTopics();
let topicConfigs = [];
let replicationFactor = await getReplicationFactor(admin);
const topicConfigs = [];
const replicationFactor = await getReplicationFactor(admin);

for (const topic in megaRecord) {

Expand All @@ -22,9 +22,9 @@ export default async function createTopics(megaRecord: any): Promise<void> {
});
topicConfigs.push(
{
topic: topic,
topic,
replicationFactor,
numPartitions: 1,
replicationFactor: replicationFactor,
configEntries: [
{
name: 'cleanup.policy',
Expand Down
3 changes: 1 addition & 2 deletions src/kafka/kafkaConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ export default async function kafkaConfig() {
connectionTimeout: 10_000,
authenticationTimeout: 10_000
};
const kafka = new Kafka(conf);
return kafka;
return new Kafka(conf);
}

if (sslCaLocation && sslCertLocation && sslKeyLocation) {
Expand Down
4 changes: 2 additions & 2 deletions src/kafka/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ export class KafkaProducer {
}

async send(key: any, value: any, topic: string) {
let encoded = await this.format.encode(value, topic);
const encoded = await this.format.encode(value, topic);
await this.producer.send({
topic: topic,
topic,
messages: [{
key: key?.toString(),
value: encoded
Expand Down
34 changes: 16 additions & 18 deletions src/schemas/generateMegaRecord.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,34 @@
import { faker } from '@faker-js/faker';
import alert from 'cli-alerts';


export async function generateRandomRecord(fakerRecord: any, generatedRecord: any = {}){
export async function generateRandomRecord(fakerRecord: any, generatedRecord: any = {}) {
// helper function to generate a record from json schema with faker data

for (const field in fakerRecord) {
if (field in generatedRecord) {
continue
}
if (typeof fakerRecord[field] === 'object') {
generatedRecord[field] = await generateRandomRecord(fakerRecord[field])
continue
}
if (fakerRecord[field] === 'iteration.index'){
}

if (fakerRecord[field] === 'iteration.index') {
generatedRecord[field] = global.iterationIndex + 1;
continue;
}

if (fakerRecord[field].match("faker\..*")) {
try {
let generatedValue =
let generatedValue =
(new Function(
'faker',
`return ${fakerRecord[field]};`
))(faker);
))(faker);
if (generatedValue instanceof Date) {
generatedValue = generatedValue.toISOString();
}
generatedRecord[field] = generatedValue;

} catch (error) {
alert({
type: `error`,
Expand All @@ -56,7 +54,7 @@ export async function generateMegaRecord(schema: any) {
// goal is to return a "mega record" with structure
// {topic: {key: the topic key field name, records: [list of records to send to Kafka]}
// where the records obey the relationships specified in the input schema file
let megaRecord = {} as any;
const megaRecord = {} as any;
for (const table of schema) {
const { _meta, ...fakerRecord } = table;
let topic = _meta.topic;
Expand All @@ -67,7 +65,7 @@ export async function generateMegaRecord(schema: any) {
// populate the initial record for the topic
if (!megaRecord[topic]) {
megaRecord[topic] = { "key": null, "records": [] };
let newRecord = await generateRandomRecord(fakerRecord);
const newRecord = await generateRandomRecord(fakerRecord);
megaRecord[topic].records.push(newRecord);
}

Expand All @@ -86,7 +84,7 @@ export async function generateMegaRecord(schema: any) {
// for records that already exist, generate values
// for every field that doesn't already have a value.
megaRecord[topic]["key"] = _meta.key
for (let existingRecord of megaRecord[topic]["records"]){
for (let existingRecord of megaRecord[topic]["records"]) {
existingRecord = await generateRandomRecord(fakerRecord, existingRecord);
}

Expand All @@ -102,17 +100,17 @@ export async function generateMegaRecord(schema: any) {
for (const existingRecord of megaRecord[topic].records) {
// ensure the new record obeys the foreign key constraint
// specified in the relationship
let newRecords = [];
let existingValue = existingRecord[relationship.parent_field];
const newRecords = [];
const existingValue = existingRecord[relationship.parent_field];
if (Array.isArray(existingValue)) {
for (let i = 0; i < existingValue.length; i++) {
let newRecord = {};
const newRecord = {};
newRecord[relationship.child_field] = existingValue[i]
newRecords.push(newRecord);
}
} else {
for (let i = 1; i <= relationship.records_per; i++) {
let newRecord = {};
const newRecord = {};
newRecord[relationship.child_field] = existingValue;
newRecords.push(newRecord);
}
Expand All @@ -130,12 +128,12 @@ export async function generateMegaRecord(schema: any) {
// We sweep through one more time to make sure all the records have all the fields they need without
// overriding existing fields that have been populated already.
for (const table of schema) {
const {_meta, ...fakerRecord} = table;
const { _meta, ...fakerRecord } = table;
let topic = _meta.topic;
if (global.prefix) {
topic = `${global.prefix}_${topic}`;
}
for (let existingRecord of megaRecord[topic].records){
for (let existingRecord of megaRecord[topic].records) {
existingRecord = await generateRandomRecord(fakerRecord, existingRecord);
}
}
Expand Down
24 changes: 12 additions & 12 deletions src/schemas/parseAvroSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ export async function parseAvroSchema(schemaFile: any) {
});

if (global.debug) {
const parsed = avro.parse(schemaFile);
console.log(parsed);
const avroSchema = avro.parse(schemaFile);
console.log(avroSchema);
}

let schema = [];
let parsed = JSON.parse(schemaFile);
const parsed = JSON.parse(schemaFile);
schema.push(parsed);

schema = await convertAvroSchemaToJson(schema);
Expand All @@ -24,28 +24,28 @@ export async function parseAvroSchema(schemaFile: any) {


function convertAvroSchemaToJson(schema: any, nonRoot: boolean = false): any {
let jsonSchema = [];
const jsonSchema = [];
schema.forEach(table => {
let schema = {};
if(!nonRoot) {
if (!nonRoot) {
schema = {
_meta: {
topic: table.name
}
};
}
table.fields.forEach(column => {

if ((column.type === 'record')) {

schema[column.name] = convertAvroSchemaToJson(column.type, true)[0];
schema[column.name] = convertAvroSchemaToJson(column.type, true)[0];

} else if (typeof column.type === "object" && !Array.isArray(column.type) && column.type.type === 'record') {

} else if(typeof column.type === "object" && !Array.isArray(column.type) && column.type.type === 'record') {
schema[column.name] = convertAvroSchemaToJson([column.type], true)[0];
schema[column.name] = convertAvroSchemaToJson([column.type], true)[0];

}

}

else {
if (Array.isArray(column.type)) {
if (column.type.length === 2 && column.type[0] === 'null') {
Expand Down
Loading

0 comments on commit effb242

Please sign in to comment.