From a24851082df85736396f9cfe66ff5b9bb70d6a87 Mon Sep 17 00:00:00 2001 From: Bobby Iliev Date: Wed, 20 Sep 2023 17:58:20 +0300 Subject: [PATCH] Refactor recordsize logic (#108) * Abstract the record size logic * Fix Kafka Dry run * Make the recordSizePayload column nullable --- .github/workflows/integration.yaml | 2 +- src/kafkaDataGenerator.ts | 11 +---------- src/postgres/createTables.ts | 15 +++++++++++++-- src/postgresDataGenerator.ts | 23 ++++++++++++----------- src/schemas/generateMegaRecord.ts | 11 +++++++++++ src/webhookDataGenerator.ts | 9 --------- tests/datagen.test.ts | 5 +++++ 7 files changed, 43 insertions(+), 33 deletions(-) diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index b2929a0..642ca90 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -52,7 +52,7 @@ jobs: run: docker exec datagen datagen -s /tests/products.sql -f postgres -n 3 - name: Produce data to Postgres with multiple tables - run: docker exec datagen datagen -s /tests/schema2.sql -f postgres -n 3 + run: docker exec datagen datagen -s /tests/schema2.sql -f postgres -n 3 -rs 1000 - name: Docker Compose Down run: docker compose down -v diff --git a/src/kafkaDataGenerator.ts b/src/kafkaDataGenerator.ts index 07ceff6..316d4c9 100644 --- a/src/kafkaDataGenerator.ts +++ b/src/kafkaDataGenerator.ts @@ -1,5 +1,4 @@ import alert from 'cli-alerts'; -import recordSize from './utils/recordSize.js'; import { KafkaProducer } from './kafka/producer.js'; import { generateMegaRecord } from './schemas/generateMegaRecord.js'; import { OutputFormat } from './formats/outputFormat.js'; @@ -20,11 +19,6 @@ export default async function kafkaDataGenerator({ initialSchema: string; }): Promise { - let payload: string; - if (global.recordSize) { - payload = await recordSize(); - } - let producer: KafkaProducer | null = null; if (global.dryRun !== true) { let outputFormat: OutputFormat; @@ -55,16 +49,13 @@ export default async function kafkaDataGenerator({ key = record[megaRecord[topic].key]; } - if (global.recordSize) { - record.recordSizePayload = payload; - } - if (global.dryRun) { alert({ type: `success`, name: `Dry run: Skipping record production...`, msg: `\n Topic: ${topic} \n Record key: ${key} \n Payload: ${JSON.stringify(record)}` }); + continue; } await producer?.send(key, record, topic); diff --git a/src/postgres/createTables.ts b/src/postgres/createTables.ts index c8a2c69..d37c89f 100644 --- a/src/postgres/createTables.ts +++ b/src/postgres/createTables.ts @@ -54,10 +54,21 @@ export default async function createTables(schema: any, initialSchemaPath: strin const queries = initialSchema.split(';'); for (const query of queries) { + let extendedQuery = query.trim(); + // Add ; to the end of the query if it's missing + if (!extendedQuery.endsWith(';')) { + extendedQuery += ';'; + } + // If the global option is enabled, add the recordSizePayload column to the table creation query + if (global.recordSize && extendedQuery.toLowerCase().startsWith('create table')) { + extendedQuery = extendedQuery.replace(/\);/g, ', recordSizePayload TEXT NULL);'); + } + try { - if (query.trim()) { - const correctedSql = query.replace(/`/g, '"').replace(/COMMENT '.*'/g, '').replace(/datetime/g, 'timestamp'); + if (extendedQuery) { + const correctedSql = extendedQuery.replace(/`/g, '"').replace(/COMMENT '.*'/g, '').replace(/datetime/g, 'timestamp'); await client.query(correctedSql); + console.log(correctedSql); } } catch (error) { alert({ diff --git a/src/postgresDataGenerator.ts b/src/postgresDataGenerator.ts index 0a15a13..32485cb 100644 --- a/src/postgresDataGenerator.ts +++ b/src/postgresDataGenerator.ts @@ -17,7 +17,6 @@ export default async function postgresDataGenerator({ iterations: number; initialSchema: string; }): Promise { - // Database client setup let client = null; if (global.dryRun) { @@ -49,28 +48,28 @@ export default async function postgresDataGenerator({ name: `Creating tables...`, msg: `` }); - client && await createTables(schema, initialSchema); + client && (await createTables(schema, initialSchema)); } } for (const table in megaRecord) { for await (const record of megaRecord[table].records) { - console.log(`\n Table: ${table} \n Record: ${JSON.stringify(record)}`); + console.log( + `\n Table: ${table} \n Record: ${JSON.stringify(record)}` + ); let key = null; if (record[megaRecord[table].key]) { key = record[megaRecord[table].key]; } - if (global.recordSize) { - record.recordSizePayload = payload; - } - if (global.dryRun) { alert({ type: `success`, name: `Dry run: Skipping record production...`, - msg: `\n Table: ${table} \n Record key: ${key} \n Payload: ${JSON.stringify(record)}` + msg: `\n Table: ${table} \n Record key: ${key} \n Payload: ${JSON.stringify( + record + )}` }); } @@ -78,9 +77,11 @@ export default async function postgresDataGenerator({ if (!global.dryRun) { try { const values = Object.values(record); - const placeholders = values.map((_, index) => `$${index + 1}`).join(', '); + const placeholders = values + .map((_, index) => `$${index + 1}`) + .join(', '); const query = `INSERT INTO ${table} VALUES (${placeholders})`; - client && await client.query(query, values); + client && (await client.query(query, values)); } catch (err) { console.error(err); } @@ -91,5 +92,5 @@ export default async function postgresDataGenerator({ await sleep(global.wait); } - client && await client.end(); + client && (await client.end()); } diff --git a/src/schemas/generateMegaRecord.ts b/src/schemas/generateMegaRecord.ts index a7f8d58..9f6a193 100644 --- a/src/schemas/generateMegaRecord.ts +++ b/src/schemas/generateMegaRecord.ts @@ -1,5 +1,6 @@ import { faker } from '@faker-js/faker'; import alert from 'cli-alerts'; +import recordSize from '../utils/recordSize.js'; export async function generateRandomRecord(fakerRecord: any, generatedRecord: any = {}) { // helper function to generate a record from json schema with faker data @@ -137,5 +138,15 @@ export async function generateMegaRecord(schema: any) { existingRecord = await generateRandomRecord(fakerRecord, existingRecord); } } + + if (global.recordSize) { + for (const topic in megaRecord) { + let payload: string = await recordSize(); + for (let record of megaRecord[topic].records) { + record["recordSizePayload"] = payload; + } + } + } + return megaRecord; } diff --git a/src/webhookDataGenerator.ts b/src/webhookDataGenerator.ts index ecadbe0..afb7d6b 100644 --- a/src/webhookDataGenerator.ts +++ b/src/webhookDataGenerator.ts @@ -2,7 +2,6 @@ import alert from 'cli-alerts'; import { generateMegaRecord } from './schemas/generateMegaRecord.js'; import { OutputFormat } from './formats/outputFormat.js'; import sleep from './utils/sleep.js'; -import recordSize from './utils/recordSize.js'; import asyncGenerator from './utils/asyncGenerator.js'; import webhookConfig from './webhook/webhookConfig.js'; @@ -36,11 +35,6 @@ export default async function webhookDataGenerator({ client = await webhookConfig(); } - let payload: string; - if (global.recordSize) { - payload = await recordSize(); - } - for await (const iteration of asyncGenerator(iterations)) { global.iterationIndex = iteration; const megaRecord = await generateMegaRecord(schema); @@ -58,9 +52,6 @@ export default async function webhookDataGenerator({ const handler = async (megaRecord: any, iteration: number) => { for (const endpoint in megaRecord) { for await (const record of megaRecord[endpoint].records) { - if (global.recordSize) { - record.recordSizePayload = payload; - } if (global.dryRun) { alert({ diff --git a/tests/datagen.test.ts b/tests/datagen.test.ts index 8f73f47..7be283d 100644 --- a/tests/datagen.test.ts +++ b/tests/datagen.test.ts @@ -77,6 +77,11 @@ describe('Test record size', () => { const output = datagen(`-s ${schema} -n 2 -rs 100`); expect(output).toContain('recordSizePayload'); }); + test('should contain the recordSizePayload if record size is set with Postgres destinations', () => { + const schema = './tests/products.sql'; + const output = datagen(`-s ${schema} -f postgres -n 2 -rs 100`); + expect(output).toContain('recordSizePayload'); + }); }); describe('Test sql output', () => {