From da9957e9114346b86fe66053e11ebb800b62d995 Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Fri, 3 Mar 2023 13:32:48 -0800 Subject: [PATCH 1/4] implement feature to clean topics and schema subjects datagen creates --- datagen.js | 25 +++++-- package-lock.json | 156 +++++++++++++++++++++++++++++++++++++++- package.json | 3 +- src/dataGenerator.js | 4 +- src/kafka/cleanKafka.js | 45 ++++++++++++ 5 files changed, 222 insertions(+), 11 deletions(-) create mode 100644 src/kafka/cleanKafka.js diff --git a/datagen.js b/datagen.js index 06c6325..91e15be 100755 --- a/datagen.js +++ b/datagen.js @@ -12,7 +12,8 @@ const alert = require('cli-alerts'); const { parseSqlSchema } = require('./src/schemas/parseSqlSchema'); const { parseAvroSchema } = require('./src/schemas/parseAvroSchema'); const { parseJsonSchema } = require('./src/schemas/parseJsonSchema'); -const jsonDataGenerator = require('./src/dataGenerator'); +const cleanKafka = require('./src/kafka/cleanKafka'); +const dataGenerator = require('./src/dataGenerator'); const fs = require('fs'); const { program, Option } = require('commander'); @@ -36,6 +37,11 @@ program .choices(['true', 'false']) .default('false') ) + .addOption( + new Option('-c, --clean ') + .choices(['true', 'false']) + .default('false') + ) .addOption( new Option('-dr, --dry-run ', 'Dry run (no data will be produced') .choices(['true', 'false']) @@ -60,6 +66,8 @@ if (!fs.existsSync(options.schema)) { global.debug = options.debug; global.recordSize = options.recordSize; global.wait = options.wait; +global.clean = options.clean; +global.dryRun = options.dryRun; if (debug === 'true') { console.log(options); @@ -106,13 +114,20 @@ if (!wait) { process.exit(); } + if (clean == 'true') { + let topics = [] + for (table of parsedSchema){ + topics.push(table._meta.topic) + } + await cleanKafka(options.format,topics) + process.exit(0); + } + // Generate data - await jsonDataGenerator({ + await dataGenerator({ format: options.format, schema: parsedSchema, - number: options.number, - dryRun: options.dryRun, - debug: options.debug + number: options.number }) await end(); diff --git a/package-lock.json b/package-lock.json index ecab04d..ec131d2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@materializeinc/datagen", - "version": "0.1.0", + "version": "0.1.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@materializeinc/datagen", - "version": "0.1.0", + "version": "0.1.1", "license": "Apache-2.0", "dependencies": { "@avro/types": "^1.0.25", @@ -14,6 +14,7 @@ "@kafkajs/confluent-schema-registry": "^3.3.0", "arg": "^5.0.2", "avro-js": "^1.11.1", + "axios": "^1.3.4", "cli-alerts": "^1.2.2", "cli-handle-unhandled": "^1.1.1", "cli-welcome": "^2.2.2", @@ -1283,6 +1284,11 @@ "sprintf-js": "~1.0.2" } }, + "node_modules/asynckit": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", + "integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==" + }, "node_modules/avro-js": { "version": "1.11.1", "resolved": "https://registry.npmjs.org/avro-js/-/avro-js-1.11.1.tgz", @@ -1299,6 +1305,16 @@ "node": ">=0.11" } }, + "node_modules/axios": { + "version": "1.3.4", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.3.4.tgz", + "integrity": "sha512-toYm+Bsyl6VC5wSkfkbbNB6ROv7KY93PEBBL6xyDczaIHasAiv4wPqQ/c4RjoQzipxRD2W5g21cOqQulZ7rHwQ==", + "dependencies": { + "follow-redirects": "^1.15.0", + "form-data": "^4.0.0", + "proxy-from-env": "^1.1.0" + } + }, "node_modules/babel-jest": { "version": "29.3.1", "resolved": "https://registry.npmjs.org/babel-jest/-/babel-jest-29.3.1.tgz", @@ -1764,6 +1780,17 @@ "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz", "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==" }, + "node_modules/combined-stream": { + "version": "1.0.8", + "resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.8.tgz", + "integrity": "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==", + "dependencies": { + "delayed-stream": "~1.0.0" + }, + "engines": { + "node": ">= 0.8" + } + }, "node_modules/commander": { "version": "9.4.1", "resolved": "https://registry.npmjs.org/commander/-/commander-9.4.1.tgz", @@ -1836,6 +1863,14 @@ "node": ">=0.10.0" } }, + "node_modules/delayed-stream": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz", + "integrity": "sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==", + "engines": { + "node": ">=0.4.0" + } + }, "node_modules/detect-newline": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/detect-newline/-/detect-newline-3.1.0.tgz", @@ -1995,6 +2030,38 @@ "node": ">=8" } }, + "node_modules/follow-redirects": { + "version": "1.15.2", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.2.tgz", + "integrity": "sha512-VQLG33o04KaQ8uYi2tVNbdrWp1QWxNNea+nmIB4EVM28v0hmP17z7aG1+wAkNzVq4KeXTq3221ye5qTJP91JwA==", + "funding": [ + { + "type": "individual", + "url": "https://github.com/sponsors/RubenVerborgh" + } + ], + "engines": { + "node": ">=4.0" + }, + "peerDependenciesMeta": { + "debug": { + "optional": true + } + } + }, + "node_modules/form-data": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.0.tgz", + "integrity": "sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==", + "dependencies": { + "asynckit": "^0.4.0", + "combined-stream": "^1.0.8", + "mime-types": "^2.1.12" + }, + "engines": { + "node": ">= 6" + } + }, "node_modules/fs.realpath": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/fs.realpath/-/fs.realpath-1.0.0.tgz", @@ -3157,6 +3224,25 @@ "node": ">=8.6" } }, + "node_modules/mime-db": { + "version": "1.52.0", + "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz", + "integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==", + "engines": { + "node": ">= 0.6" + } + }, + "node_modules/mime-types": { + "version": "2.1.35", + "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz", + "integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==", + "dependencies": { + "mime-db": "1.52.0" + }, + "engines": { + "node": ">= 0.6" + } + }, "node_modules/mimic-fn": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/mimic-fn/-/mimic-fn-2.1.0.tgz", @@ -3450,6 +3536,11 @@ "pbts": "bin/pbts" } }, + "node_modules/proxy-from-env": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz", + "integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==" + }, "node_modules/punycode": { "version": "2.3.0", "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.3.0.tgz", @@ -4942,6 +5033,11 @@ "sprintf-js": "~1.0.2" } }, + "asynckit": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", + "integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==" + }, "avro-js": { "version": "1.11.1", "resolved": "https://registry.npmjs.org/avro-js/-/avro-js-1.11.1.tgz", @@ -4955,6 +5051,16 @@ "resolved": "https://registry.npmjs.org/avsc/-/avsc-5.7.7.tgz", "integrity": "sha512-9cYNccliXZDByFsFliVwk5GvTq058Fj513CiR4E60ndDwmuXzTJEp/Bp8FyuRmGyYupLjHLs+JA9/CBoVS4/NQ==" }, + "axios": { + "version": "1.3.4", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.3.4.tgz", + "integrity": "sha512-toYm+Bsyl6VC5wSkfkbbNB6ROv7KY93PEBBL6xyDczaIHasAiv4wPqQ/c4RjoQzipxRD2W5g21cOqQulZ7rHwQ==", + "requires": { + "follow-redirects": "^1.15.0", + "form-data": "^4.0.0", + "proxy-from-env": "^1.1.0" + } + }, "babel-jest": { "version": "29.3.1", "resolved": "https://registry.npmjs.org/babel-jest/-/babel-jest-29.3.1.tgz", @@ -5315,6 +5421,14 @@ "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz", "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==" }, + "combined-stream": { + "version": "1.0.8", + "resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.8.tgz", + "integrity": "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==", + "requires": { + "delayed-stream": "~1.0.0" + } + }, "commander": { "version": "9.4.1", "resolved": "https://registry.npmjs.org/commander/-/commander-9.4.1.tgz", @@ -5369,6 +5483,11 @@ "integrity": "sha512-FJ3UgI4gIl+PHZm53knsuSFpE+nESMr7M4v9QcgB7S63Kj/6WqMiFQJpBBYz1Pt+66bZpP3Q7Lye0Oo9MPKEdg==", "dev": true }, + "delayed-stream": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz", + "integrity": "sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==" + }, "detect-newline": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/detect-newline/-/detect-newline-3.1.0.tgz", @@ -5488,6 +5607,21 @@ "path-exists": "^4.0.0" } }, + "follow-redirects": { + "version": "1.15.2", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.2.tgz", + "integrity": "sha512-VQLG33o04KaQ8uYi2tVNbdrWp1QWxNNea+nmIB4EVM28v0hmP17z7aG1+wAkNzVq4KeXTq3221ye5qTJP91JwA==" + }, + "form-data": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.0.tgz", + "integrity": "sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==", + "requires": { + "asynckit": "^0.4.0", + "combined-stream": "^1.0.8", + "mime-types": "^2.1.12" + } + }, "fs.realpath": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/fs.realpath/-/fs.realpath-1.0.0.tgz", @@ -6359,6 +6493,19 @@ "picomatch": "^2.3.1" } }, + "mime-db": { + "version": "1.52.0", + "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz", + "integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==" + }, + "mime-types": { + "version": "2.1.35", + "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz", + "integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==", + "requires": { + "mime-db": "1.52.0" + } + }, "mimic-fn": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/mimic-fn/-/mimic-fn-2.1.0.tgz", @@ -6571,6 +6718,11 @@ "long": "^4.0.0" } }, + "proxy-from-env": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz", + "integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==" + }, "punycode": { "version": "2.3.0", "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.3.0.tgz", diff --git a/package.json b/package.json index 9a18807..b1eee5c 100644 --- a/package.json +++ b/package.json @@ -30,10 +30,11 @@ }, "dependencies": { "@avro/types": "^1.0.25", - "@kafkajs/confluent-schema-registry": "^3.3.0", "@faker-js/faker": "^7.6.0", + "@kafkajs/confluent-schema-registry": "^3.3.0", "arg": "^5.0.2", "avro-js": "^1.11.1", + "axios": "^1.3.4", "cli-alerts": "^1.2.2", "cli-handle-unhandled": "^1.1.1", "cli-welcome": "^2.2.2", diff --git a/src/dataGenerator.js b/src/dataGenerator.js index 91f6084..9a79352 100644 --- a/src/dataGenerator.js +++ b/src/dataGenerator.js @@ -91,9 +91,7 @@ async function prepareSchema(megaRecord, topic, registry, avroSchemas) { module.exports = async ({ format, schema, - number, - dryRun = false, - debug = false + number }) => { let payload; if (recordSize) { diff --git a/src/kafka/cleanKafka.js b/src/kafka/cleanKafka.js new file mode 100644 index 0000000..0c67062 --- /dev/null +++ b/src/kafka/cleanKafka.js @@ -0,0 +1,45 @@ +const kafkaConfig = require('./kafkaConfig'); +const axios = require('axios'); +const dotenv = require('dotenv'); + + +async function deleteSchemaSubjects(topics) { + dotenv.config(); + for (const topic of topics){ + let url = `${process.env.SCHEMA_REGISTRY_URL}/subjects/${topic}-value?permanent=true`; + axios.delete( + url, + { + auth: { + username: process.env.SCHEMA_REGISTRY_USERNAME, + password: process.env.SCHEMA_REGISTRY_PASSWORD + } + } + ).then((response) => { + console.log(response.data); + }) + .catch((error) => { + console.error(error); + }); + } +} + +module.exports = async (format, topics) => { + + if (dryRun == 'true') { + console.log("This is a dry run, so no resources will be deleted") + return + } + const kafka = kafkaConfig(); + const admin = kafka.admin(); + await admin.connect(); + await admin.deleteTopics({ + topics: topics + }) + if (format != 'avro') { + console.log("Skipping Schema Registry") + } else { + await deleteSchemaSubjects(topics); + } + +}; \ No newline at end of file From fbb4350af13ec95e70dad760ef999b163a960cb1 Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Fri, 3 Mar 2023 14:02:19 -0800 Subject: [PATCH 2/4] better error handling and soft delete the subject --- src/kafka/cleanKafka.js | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/kafka/cleanKafka.js b/src/kafka/cleanKafka.js index 0c67062..08158c6 100644 --- a/src/kafka/cleanKafka.js +++ b/src/kafka/cleanKafka.js @@ -5,9 +5,13 @@ const dotenv = require('dotenv'); async function deleteSchemaSubjects(topics) { dotenv.config(); - for (const topic of topics){ - let url = `${process.env.SCHEMA_REGISTRY_URL}/subjects/${topic}-value?permanent=true`; - axios.delete( + if (!process.env.SCHEMA_REGISTRY_URL) { + console.error("Please set SCHEMA_REGISTRY_URL"); + process.exit(); + } + for await (const topic of topics){ + let url = `${process.env.SCHEMA_REGISTRY_URL}/subjects/${topic}-value?permanent=false`; + await axios.delete( url, { auth: { @@ -16,6 +20,7 @@ async function deleteSchemaSubjects(topics) { } } ).then((response) => { + console.log(response.status); console.log(response.data); }) .catch((error) => { @@ -33,9 +38,15 @@ module.exports = async (format, topics) => { const kafka = kafkaConfig(); const admin = kafka.admin(); await admin.connect(); - await admin.deleteTopics({ - topics: topics - }) + try { + await admin.deleteTopics({ + topics: topics + }) + } catch (error) { + console.log(error) + } + await admin.disconnect(); + if (format != 'avro') { console.log("Skipping Schema Registry") } else { From 8ec6b8d9c9b233cf56d3d7e44657f800a46eb08e Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Fri, 3 Mar 2023 14:14:34 -0800 Subject: [PATCH 3/4] limit the axios error message for schema registry --- src/kafka/cleanKafka.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/kafka/cleanKafka.js b/src/kafka/cleanKafka.js index 08158c6..3c90f39 100644 --- a/src/kafka/cleanKafka.js +++ b/src/kafka/cleanKafka.js @@ -21,10 +21,11 @@ async function deleteSchemaSubjects(topics) { } ).then((response) => { console.log(response.status); - console.log(response.data); + console.log(`deleted subject ${topic}-value}`); }) .catch((error) => { - console.error(error); + console.error(error.response.status); + console.error(error.response.data.message); }); } } From 45313bbfff3b521d81eb91abd8024bda909c57fb Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Fri, 3 Mar 2023 14:20:03 -0800 Subject: [PATCH 4/4] log the names of the deleted topics --- src/kafka/cleanKafka.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/kafka/cleanKafka.js b/src/kafka/cleanKafka.js index 3c90f39..ed32a3a 100644 --- a/src/kafka/cleanKafka.js +++ b/src/kafka/cleanKafka.js @@ -21,7 +21,7 @@ async function deleteSchemaSubjects(topics) { } ).then((response) => { console.log(response.status); - console.log(`deleted subject ${topic}-value}`); + console.log(`deleted subject ${topic}-value`); }) .catch((error) => { console.error(error.response.status); @@ -43,6 +43,7 @@ module.exports = async (format, topics) => { await admin.deleteTopics({ topics: topics }) + console.log(`deleted Kafka topics ${topics}`) } catch (error) { console.log(error) }