From dc31196732c7ae717d8deb3cb1917132c8299a7a Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Mon, 6 Mar 2023 12:26:58 -0800 Subject: [PATCH 01/17] move -s before -f and modify wording of -c description --- datagen.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datagen.js b/datagen.js index 8c9cc08..6cf8bf4 100755 --- a/datagen.js +++ b/datagen.js @@ -20,19 +20,19 @@ const { program, Option } = require('commander'); program.name('datagen').description('Fake Data Generator').version('0.1.2'); program + .requiredOption('-s, --schema ', 'Schema file to use') .addOption( new Option('-f, --format ', 'The format of the produced data') .choices(['json', 'avro']) .default('json') ) - .requiredOption('-s, --schema ', 'Schema file to use') .addOption( new Option( '-n, --number ', 'Number of records to generate. For infinite records, use -1' ).default('10') ) - .option('-c, --clean', 'Clean Kafka topic and schema registry before producing data') + .option('-c, --clean', 'Clean (delete) Kafka topics and schema subjects previously created') .option('-dr, --dry-run', 'Dry run (no data will be produced to Kafka)') .option('-d, --debug', 'Output extra debugging information') .option('-w, --wait ', 'Wait time in ms between record production', parseInt) From 298a428653d2158f28a01d6d573e480693163ee1 Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Mon, 6 Mar 2023 12:28:15 -0800 Subject: [PATCH 02/17] skip schema registration on dry run --- src/dataGenerator.js | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/dataGenerator.js b/src/dataGenerator.js index 1406a60..640e53e 100644 --- a/src/dataGenerator.js +++ b/src/dataGenerator.js @@ -111,7 +111,15 @@ module.exports = async ({ if (iteration == 0) { if (format == 'avro') { - registry = await schemaRegistryConfig(); + if (dryRun) { + alert({ + type: `success`, + name: `Dry run: Skipping schema registration...`, + msg: `` + }); + } else { + registry = await schemaRegistryConfig(); + } } for (const topic in megaRecord) { await prepareTopic(topic, dryRun); From 298288ffcedccc1c5def8cd8ae58c6aa9affe88f Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Mon, 6 Mar 2023 12:29:31 -0800 Subject: [PATCH 03/17] add alert library to schema registry config so proper alert is triggered when no sr url is found --- src/kafka/schemaRegistryConfig.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/kafka/schemaRegistryConfig.js b/src/kafka/schemaRegistryConfig.js index 7ec9217..e48421e 100644 --- a/src/kafka/schemaRegistryConfig.js +++ b/src/kafka/schemaRegistryConfig.js @@ -1,5 +1,7 @@ const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry'); const dotenv = require('dotenv'); +const alert = require('cli-alerts'); + module.exports = async () => { From 11eab868ba1c071ac3e76ef3bca90a7bf33a8d4f Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Mon, 6 Mar 2023 12:36:15 -0800 Subject: [PATCH 04/17] bump version to 0.1.3 --- datagen.js | 2 +- package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datagen.js b/datagen.js index 6cf8bf4..d379087 100755 --- a/datagen.js +++ b/datagen.js @@ -17,7 +17,7 @@ const dataGenerator = require('./src/dataGenerator'); const fs = require('fs'); const { program, Option } = require('commander'); -program.name('datagen').description('Fake Data Generator').version('0.1.2'); +program.name('datagen').description('Fake Data Generator').version('0.1.3'); program .requiredOption('-s, --schema ', 'Schema file to use') diff --git a/package.json b/package.json index b10ef49..a47891a 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@materializeinc/datagen", "description": "Materialize Datagen CLI tool", - "version": "0.1.2", + "version": "0.1.3", "license": "Apache-2.0", "bin": { "@MaterializeInc/datagen": "datagen.js", From bcf9e24120fd3afbc398c388a4497296362e8cbd Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Mon, 6 Mar 2023 12:36:44 -0800 Subject: [PATCH 05/17] add example-schemas folder --- example-schemas/blog.json | 61 ++++++++++++++++++++++++++++++++++ example-schemas/ecommerce.json | 51 ++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+) create mode 100644 example-schemas/blog.json create mode 100644 example-schemas/ecommerce.json diff --git a/example-schemas/blog.json b/example-schemas/blog.json new file mode 100644 index 0000000..ac0bc02 --- /dev/null +++ b/example-schemas/blog.json @@ -0,0 +1,61 @@ +[ + { + "_meta": { + "topic": "mz_datagen_blog_users", + "key": "id", + "relationships": [ + { + "topic": "mz_datagen_blog_posts", + "parent_field": "id", + "child_field": "user_id", + "records_per": 2 + } + ] + }, + "id": "datatype.number(100)", + "name": "internet.userName", + "email": "internet.exampleEmail", + "phone": "phone.imei", + "website": "internet.domainName", + "city": "address.city", + "company": "company.name" + }, + { + "_meta": { + "topic": "mz_datagen_blog_posts", + "key": "id", + "relationships": [ + { + "topic": "mz_datagen_blog_comments", + "parent_field": "id", + "child_field": "post_id", + "records_per": 2 + } + ] + }, + "id": "datatype.number(1000)", + "user_id": "datatype.number(100)", + "title": "lorem.sentence", + "body": "lorem.paragraph" + }, + { + "_meta": { + "topic": "mz_datagen_blog_comments", + "key": "id", + "relationships": [ + { + "topic": "mz_datagen_blog_users", + "parent_field": "user_id", + "child_field": "id", + "records_per": 1 + } + ] + }, + "id": "datatype.number(2000)", + "user_id": "datatype.number(100)", + "body": "lorem.paragraph", + "post_id": "datatype.number(1000)", + "views": "datatype.number({\"min\": 100, \"max\": 1000})", + "status": "datatype.number(1)" + } +] \ No newline at end of file diff --git a/example-schemas/ecommerce.json b/example-schemas/ecommerce.json new file mode 100644 index 0000000..1b459db --- /dev/null +++ b/example-schemas/ecommerce.json @@ -0,0 +1,51 @@ +[ + { + "_meta": { + "topic": "mz_datagen_ecommerce_users", + "key": "id", + "relationships": [ + { + "topic": "mz_datagen_ecommerce_purchases", + "parent_field": "id", + "child_field": "user_id", + "records_per": 2 + } + ] + }, + "id": "datatype.number(1000)", + "name": "internet.userName", + "email": "internet.exampleEmail", + "city": "address.city", + "state": "address.state", + "zipcode": "address.zipCode", + "created_at": "date.recent" + }, + { + "_meta": { + "topic": "mz_datagen_ecommerce_purchases", + "key": "id", + "relationships": [ + { + "topic": "mz_datagen_ecommerce_items", + "parent_field": "item_id", + "child_field": "id", + "records_per": 3 + } + ] + }, + "id": "datatype.uuid", + "user_id": "datatype.number(1000)", + "item_id": "datatype.number(5000)", + "created_at": "date.recent" + }, + { + "_meta": { + "topic": "mz_datagen_ecommerce_items", + "key": "id" + }, + "id": "datatype.number(5000)", + "name": "commerce.product", + "price": "commerce.price", + "created_at": "date.recent" + } +] \ No newline at end of file From cd1ac1b3aab3b3ad9dd6a6ca67d7547b4880dadf Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Mon, 6 Mar 2023 14:06:22 -0800 Subject: [PATCH 06/17] remove extraneous hydra.key from avsc file --- tests/schema.avsc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/schema.avsc b/tests/schema.avsc index 09aa85f..c146731 100644 --- a/tests/schema.avsc +++ b/tests/schema.avsc @@ -10,7 +10,6 @@ { "name": "isLimited", "type": "boolean" }, { "name": "sizes", "type": ["null", "string"], "default": null }, { "name": "ownerIds", "type": { "type": "array", "items": "string" } } - ], - "hydra.key": "id" + ] } From f0768c97351a69fe64d035bb2abc3c98271c0bb1 Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Mon, 6 Mar 2023 14:06:34 -0800 Subject: [PATCH 07/17] update readme --- README.md | 263 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 171 insertions(+), 92 deletions(-) diff --git a/README.md b/README.md index caff4c7..db7fad7 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,30 @@ # Datagen CLI -### Installation +This command line interface application allows you to take schemas defined in JSON (`.json`), Avro (`.avsc`), or SQL (`.sql`) and produce believable fake data to Kafka in JSON or Avro format. + +The benefits of using this datagen tool are: +- You can specify what values are generated using the expansive [FakerJS API](https://fakerjs.dev/api/) to craft data that more faithfully imitates your use case. This allows you to more easily apply business logic downstream. +- This is a relatively simple CLI tool compared to other Kafka data generators that require Kafka Connect. +- When using the `avro` output format, datagen connects to Schema Registry. This allows you to take advantage of the [benefits](https://www.confluent.io/blog/schema-registry-kafka-stream-processing-yes-virginia-you-really-need-one/) of using Schema Registry. +- Often when you generate random data, your downstream join results won't make sense because it's unlikely a randomly generated field in one dataset will match a randomly generated field in another. With this datagen tool, you can specify relationships between your datasets so that related columns will match up, resulting in meaningful joins downstream. Jump to the [end-to-end tutorial](./end-to-end.md) later in this document for a full example. + +> :construction: Specifying relationships between datasets currently requires using JSON for the input schema. + +## Installation + +### npm + +``` +npm install -g @materializeinc/datagen +``` + +### Docker + +``` +docker pull materialize/datagen +``` +### From Source -> Note: Until the package has been published on npmjs.org, you can install it from source ```bash git clone https://github.com/MaterializeInc/datagen.git @@ -11,20 +33,43 @@ npm install npm link ``` -### Usage +## Setup + +Create a file called `.env` with the following environment variables + +```bash +# Connect to Kafka +SASL_USERNAME= +SASL_PASSWORD= +SASL_MECHANISM= +KAFKA_BROKERS= + +# Connect to Schema Registry if producing Avro +SCHEMA_REGISTRY_URL= +SCHEMA_REGISTRY_USERNAME= +SCHEMA_REGISTRY_PASSWORD= +``` + +The `datagen` program will read the environment variables from `.env` in the current working directory. + + +## Usage ```bash datagen -h +``` + +``` Usage: datagen [options] Fake Data Generator Options: -V, --version output the version number - -f, --format The format of the produced data (choices: "json", "avro", default: "json") -s, --schema Schema file to use + -f, --format The format of the produced data (choices: "json", "avro", default: "json") -n, --number Number of records to generate. For infinite records, use -1 (default: "10") - -c, --clean Clean Kafka topic and schema registry before producing data + -c, --clean Clean (delete) Kafka topics and schema subjects previously created -dr, --dry-run Dry run (no data will be produced to Kafka) -d, --debug Output extra debugging information -w, --wait Wait time in ms between record production @@ -32,44 +77,58 @@ Options: -h, --help display help for command ``` -### Env variables -To produce records to a Kafka topic, you need to set the following environment variables: +## Quick Examples -```bash -SASL_USERNAME= -SASL_PASSWORD= -SASL_MECHANISM= -KAFKA_BROKERS= -``` - -### Examples +See example input schema files in [example-schemas](/example-schemas) and [tests](/tests) folders. -```bash -# Generate 10 records in JSON format -datagen -s products.sql -f json -n 10 -``` +### Quickstart -Output: +1. Iterate through a schema defined in SQL 10 times, but don't actually interact with Kafka or Schema Registry ("dry run"). Also, see extra output with debug mode. + ```bash + datagen --schema products.sql --format avro --dry-run --debug + ``` -``` -✔ Parsing schema... +1. Same as above, but actually create the schema subjects and Kafka topics, and actually produce the data. There is less output because debug mode is off. + ```bash + datagen \ + --schema products.sql \ + --format avro + ``` +1. Same as above, but produce to Kafka continuously. Press `Ctrl+C` to quit. + ```bash + datagen \ + -s products.sql \ + -f avro \ + -n -1 + ``` -✔ Creating Kafka topic... +1. If you want to generate a larger payload, you can use the `--record-size` option to specify number of bytes of junk data to add to each record. Here, we generate a 1MB record. So if you have to generate 1GB of data, you run the command with the following options: + ```bash + datagen \ + -s products.sql \ + -f avro \ + -n 1000 \ + --record-size 1048576 + ``` + This will add a `recordSizePayload` field to the record with the specified size and will send the record to Kafka. + > :notebook: The 'Max Message Size' of your Kafka cluster needs to be set to a higher value than 1MB for this to work. -✔ Producing records... +1. Clean (delete) the topics and schema subjects created above + ```bash + datagen \ + --schema products.sql \ + --format avro \ + --clean + ``` +### Generate records with sequence numbers -✔ Record sent to Kafka topic - {"products":{"id":50720,"name":"white","merchant_id":76809,"price":1170,"status":89517,"created_at":"upset"}} - ... -``` - -### JSON Schema +To simulate auto incrementing primary keys, you can use the `iteration.index` variable in the schema. -The JSON schema option allows you to define the data that is generated using Faker.js. +This is particularly useful when you want to generate a small set of records with sequence of IDs, for example 1000 records with IDs from 1 to 1000: ```json [ @@ -77,47 +136,86 @@ The JSON schema option allows you to define the data that is generated using Fak "_meta": { "topic": "mz_datagen_users" }, - "id": "datatype.uuid", + "id": "iteration.index", "name": "internet.userName", - "email": "internet.exampleEmail", - "phone": "phone.imei", - "website": "internet.domainName", - "city": "address.city", - "company": "company.name", - "age": "datatype.number", - "created_at": "datatype.datetime" } ] ``` -The schema needs to be an array of objects, as that way we can produce relational data in the future. +Example: -Each object represents a record that will be generated. The `_meta` key is used to define the topic that the record will be sent to. +``` +datagen \ + -s tests/iterationIndex.json \ + -f json \ + -n 1000 \ + --dry-run +``` -You can find the documentation for Faker.js [here](https://fakerjs.dev/api/) +### Docker -### Record Size Option +Call the docker container like you would call the CLI locally, except: +- include `--rm` to remove the container when it exits +- include `-it` (interactive teletype) to see the output as you would locally (e.g. colors) +- mount `.env` and schema files into the container +- note that the working directory in the container is `/app` -In some cases, you might need to generate a large amount of data. In that case, you can use the `--record-size` option to generate a record of a specific size. +``` +docker run \ + --rm -it \ + -v ${PWD}/.env:/app/.env \ + -v ${PWD}/tests/schema.json:/app/blah.json \ + materialize/datagen -s blah.json -n 1 --dry-run +``` -The `--record-size 1048576` option will generate a 1MB record. So if you have to generate 1GB of data, you run the command with the following options: +## Input Schemas -```bash -datagen -s ./tests/datasize.json -f json -n 1000 --record-size 1048576 +You can define input schemas using JSON (`.json`), Avro (`.avsc`), or SQL (`.sql`). Within those schemas, you use the [FakerJS API](https://fakerjs.dev/api/) to define the data that is generated for each field. + +You can pass arguments to `faker` methods by escaping quotes. For example, here is [datatype.number](https://fakerjs.dev/api/datatype.html#number) with `min` and `max` arguments: + +``` +"datatype.number({\"min\": 100, \"max\": 1000})" ``` -This will add a `recordSizePayload` key to the record with the specified size and will send the record to Kafka. +> :construction: Right now, JSON is the only kind of input schema that supports generating relational data. +### JSON Schema -> Note: The 'Max Message Size' of your Kafka cluster needs to be set to a higher value than 1MB for this to work. +Here is the general syntax for a JSON input schema: -### `UPSERT` Evelope Support +```json +[ + { + "_meta": { + "topic": "", + "key": "" , + "relationships": [ + { + "topic": "", + "parent_field": "", + "child_field": "", + "records_per": + }, + ... + ] + }, + "": "", + "": "", + ... + }, + { + ... + }, + ... +] +``` + +Go to the [end-to-end tutorial](./end-to-end.md) to walk through an example that uses a JSON input schema. -To make sure `UPSERT` envelope is supported, you need to define an `id` column in the schema. -The value of the `id` column will be used as the key of the record. -### Faker.js and SQL Schema +### SQL Schema -The SQL schema option allows you to define the data that is generated using Faker.js by defining a `COMMENT` on the column. +The SQL schema option allows you to use a `CREATE TABLE` statement to define what data is generated. You specify the [FakerJS API](https://fakerjs.dev/api/) method using a `COMMENT` on the column. Here is an example: ```sql CREATE TABLE "ecommerce"."products" ( @@ -130,46 +228,27 @@ CREATE TABLE "ecommerce"."products" ( ); ``` -The `COMMENT` needs to be a valid Faker.js function. You can find the documentation for Faker.js [here](https://fakerjs.dev/api/). - -### Docker - -Build the docker image. - -``` -docker buildx build -t datagen . -``` - -Run a command. +This will produce the desired mock data to the topic `ecommerce.products`. -``` -docker run \ - --rm -it \ - -v ${PWD}/.env:/app/.env \ - -v ${PWD}/tests/schema.json:/app/blah.json \ - datagen -s blah.json -n 1 --dry-run -``` +### Avro Schema -### Generate records with sequence numbers +> :construction: Avro input schema currently does not support arbitrary FakerJS methods. Instead, data is randomly generated based on the type. -To simulate auto incrementing primary keys, you can use the `iteration.index` variable in the schema. - -This is particularly useful when you want to generate a small set of records with sequence of IDs, for example 1000 records with IDs from 1 to 1000: +Here is an example Avro input schema from `tests/schema.avsc` that will produce data to a topic called `products`: ```json -[ - { - "_meta": { - "topic": "mz_datagen_users" - }, - "id": "iteration.index", - "name": "internet.userName", - } -] -``` - -Example: - -``` -datagen -s tests/iterationIndex.json --dry-run -f json -n 1000 -``` +{ + "type": "record", + "name": "products", + "namespace": "exp.products.v1", + "fields": [ + { "name": "id", "type": "string" }, + { "name": "productId", "type": ["null", "string"] }, + { "name": "title", "type": "string" }, + { "name": "price", "type": "int" }, + { "name": "isLimited", "type": "boolean" }, + { "name": "sizes", "type": ["null", "string"], "default": null }, + { "name": "ownerIds", "type": { "type": "array", "items": "string" } } + ] +} +``` \ No newline at end of file From a57f7c44b9a8c310313d31e073e029760184425d Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Mon, 6 Mar 2023 14:17:24 -0800 Subject: [PATCH 08/17] rename compose.yml to docker-compose.yml for clarity --- docker-compose.yaml | 48 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 docker-compose.yaml diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..256e707 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,48 @@ +--- +version: "3.9" + +services: + + zookeeper: + image: quay.io/debezium/zookeeper:1.9 + container_name: zookeeper + ports: + - 2181:2181 + - 2888:2888 + - 3888:3888 + + kafka: + image: quay.io/debezium/kafka:1.9 + container_name: kafka + ports: + - 9092:9092 + links: + - zookeeper + environment: + ZOOKEEPER_CONNECT: zookeeper:2181 + + schema-registry: + image: confluentinc/cp-schema-registry:7.2.0 + container_name: schema-registry + ports: + - 8081:8081 + depends_on: + - kafka + environment: + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092 + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081,http://localhost:8081 + + datagen: + build: . + container_name: datagen + depends_on: + - kafka + - schema-registry + environment: + SCHEMA_REGISTRY_URL: http://schema-registry:8081 + KAFKA_BROKERS: kafka:9092 + volumes: + - ./tests:/tests + # Override the entrypoint to run the container and keep it running + entrypoint: sh -c "while true; do sleep 1; done" From 69bf739955a670d1e54ed827eac5d6d8c699132d Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Mon, 6 Mar 2023 14:21:11 -0800 Subject: [PATCH 09/17] rename and move files --- README.md | 18 +++--- compose.yaml | 48 ---------------- {example-schemas => examples}/blog.json | 0 {example-schemas => examples}/ecommerce.json | 0 examples/ecommerce.md | 60 ++++++++++++++++++++ tests/datagen.test.js | 4 +- tests/{schema.sql => products.sql} | 0 7 files changed, 71 insertions(+), 59 deletions(-) delete mode 100644 compose.yaml rename {example-schemas => examples}/blog.json (100%) rename {example-schemas => examples}/ecommerce.json (100%) create mode 100644 examples/ecommerce.md rename tests/{schema.sql => products.sql} (100%) diff --git a/README.md b/README.md index db7fad7..49e0438 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ The benefits of using this datagen tool are: - You can specify what values are generated using the expansive [FakerJS API](https://fakerjs.dev/api/) to craft data that more faithfully imitates your use case. This allows you to more easily apply business logic downstream. - This is a relatively simple CLI tool compared to other Kafka data generators that require Kafka Connect. - When using the `avro` output format, datagen connects to Schema Registry. This allows you to take advantage of the [benefits](https://www.confluent.io/blog/schema-registry-kafka-stream-processing-yes-virginia-you-really-need-one/) of using Schema Registry. -- Often when you generate random data, your downstream join results won't make sense because it's unlikely a randomly generated field in one dataset will match a randomly generated field in another. With this datagen tool, you can specify relationships between your datasets so that related columns will match up, resulting in meaningful joins downstream. Jump to the [end-to-end tutorial](./end-to-end.md) later in this document for a full example. +- Often when you generate random data, your downstream join results won't make sense because it's unlikely a randomly generated field in one dataset will match a randomly generated field in another. With this datagen tool, you can specify relationships between your datasets so that related columns will match up, resulting in meaningful joins downstream. Jump to the [end-to-end ecommerce tutorial](./examples/ecommerce.md) for a full example. > :construction: Specifying relationships between datasets currently requires using JSON for the input schema. @@ -44,7 +44,7 @@ SASL_PASSWORD= SASL_MECHANISM= KAFKA_BROKERS= -# Connect to Schema Registry if producing Avro +# Connect to Schema Registry if using '--format avro' SCHEMA_REGISTRY_URL= SCHEMA_REGISTRY_USERNAME= SCHEMA_REGISTRY_PASSWORD= @@ -80,26 +80,26 @@ Options: ## Quick Examples -See example input schema files in [example-schemas](/example-schemas) and [tests](/tests) folders. +See example input schema files in [examples](./examples) and [tests](/tests) folders. ### Quickstart 1. Iterate through a schema defined in SQL 10 times, but don't actually interact with Kafka or Schema Registry ("dry run"). Also, see extra output with debug mode. ```bash - datagen --schema products.sql --format avro --dry-run --debug + datagen --schema tests/products.sql --format avro --dry-run --debug ``` 1. Same as above, but actually create the schema subjects and Kafka topics, and actually produce the data. There is less output because debug mode is off. ```bash datagen \ - --schema products.sql \ + --schema tests/products.sql \ --format avro ``` 1. Same as above, but produce to Kafka continuously. Press `Ctrl+C` to quit. ```bash datagen \ - -s products.sql \ + -s tests/products.sql \ -f avro \ -n -1 ``` @@ -107,7 +107,7 @@ See example input schema files in [example-schemas](/example-schemas) and [tests 1. If you want to generate a larger payload, you can use the `--record-size` option to specify number of bytes of junk data to add to each record. Here, we generate a 1MB record. So if you have to generate 1GB of data, you run the command with the following options: ```bash datagen \ - -s products.sql \ + -s tests/products.sql \ -f avro \ -n 1000 \ --record-size 1048576 @@ -119,7 +119,7 @@ See example input schema files in [example-schemas](/example-schemas) and [tests 1. Clean (delete) the topics and schema subjects created above ```bash datagen \ - --schema products.sql \ + --schema tests/products.sql \ --format avro \ --clean ``` @@ -210,7 +210,7 @@ Here is the general syntax for a JSON input schema: ] ``` -Go to the [end-to-end tutorial](./end-to-end.md) to walk through an example that uses a JSON input schema. +Go to the [end-to-end ecommerce tutorial](./examples/ecommerce.md) to walk through an example that uses a JSON input schema with relational data. ### SQL Schema diff --git a/compose.yaml b/compose.yaml deleted file mode 100644 index 256e707..0000000 --- a/compose.yaml +++ /dev/null @@ -1,48 +0,0 @@ ---- -version: "3.9" - -services: - - zookeeper: - image: quay.io/debezium/zookeeper:1.9 - container_name: zookeeper - ports: - - 2181:2181 - - 2888:2888 - - 3888:3888 - - kafka: - image: quay.io/debezium/kafka:1.9 - container_name: kafka - ports: - - 9092:9092 - links: - - zookeeper - environment: - ZOOKEEPER_CONNECT: zookeeper:2181 - - schema-registry: - image: confluentinc/cp-schema-registry:7.2.0 - container_name: schema-registry - ports: - - 8081:8081 - depends_on: - - kafka - environment: - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092 - SCHEMA_REGISTRY_HOST_NAME: schema-registry - SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081,http://localhost:8081 - - datagen: - build: . - container_name: datagen - depends_on: - - kafka - - schema-registry - environment: - SCHEMA_REGISTRY_URL: http://schema-registry:8081 - KAFKA_BROKERS: kafka:9092 - volumes: - - ./tests:/tests - # Override the entrypoint to run the container and keep it running - entrypoint: sh -c "while true; do sleep 1; done" diff --git a/example-schemas/blog.json b/examples/blog.json similarity index 100% rename from example-schemas/blog.json rename to examples/blog.json diff --git a/example-schemas/ecommerce.json b/examples/ecommerce.json similarity index 100% rename from example-schemas/ecommerce.json rename to examples/ecommerce.json diff --git a/examples/ecommerce.md b/examples/ecommerce.md new file mode 100644 index 0000000..d8fffd3 --- /dev/null +++ b/examples/ecommerce.md @@ -0,0 +1,60 @@ +# End-to-end Tutorial with Materialize + + +Consider the example from `example-schemas/ecommerce.json`. There are `users`, `purchases`, and `items`. For each iteration of `datagen`, a user is created, then 2 purchases are created that are associated with that user, and then 3 items are created for each purchase so that the purchase's `item_id` is equal to the `id` for each item associated with it. + +Here is the input schema: + +```json +[ + { + "_meta": { + "topic": "mz_datagen_ecommerce_users", + "key": "id", + "relationships": [ + { + "topic": "mz_datagen_ecommerce_purchases", + "parent_field": "id", + "child_field": "user_id", + "records_per": 2 + } + ] + }, + "id": "datatype.number(1000)", + "name": "internet.userName", + "email": "internet.exampleEmail", + "city": "address.city", + "state": "address.state", + "zipcode": "address.zipCode", + "created_at": "date.recent" + }, + { + "_meta": { + "topic": "mz_datagen_ecommerce_purchases", + "key": "id", + "relationships": [ + { + "topic": "mz_datagen_ecommerce_items", + "parent_field": "item_id", + "child_field": "id", + "records_per": 3 + } + ] + }, + "id": "datatype.uuid", + "user_id": "datatype.number(1000)", + "item_id": "datatype.number(5000)", + "created_at": "date.recent" + }, + { + "_meta": { + "topic": "mz_datagen_ecommerce_items", + "key": "id" + }, + "id": "datatype.number(5000)", + "name": "commerce.product", + "price": "commerce.price", + "created_at": "date.recent" + } +] +``` \ No newline at end of file diff --git a/tests/datagen.test.js b/tests/datagen.test.js index 06e7d83..164eb44 100644 --- a/tests/datagen.test.js +++ b/tests/datagen.test.js @@ -22,7 +22,7 @@ describe('Schema Parsing Tests', () => { expect(output).toContain('Stopping the data generator'); }); test('should parse sql schema', () => { - const schema = './tests/schema.sql'; + const schema = './tests/products.sql'; const output = datagen(`-s ${schema} -n 2`); expect(output).toContain('Parsing schema...'); expect(output).toContain('Dry run: Skipping topic creation...'); @@ -65,7 +65,7 @@ describe('Test record size', () => { expect(output).toContain('recordSizePayload'); }); test('should contain the recordSizePayload if record size is set in sql schema', () => { - const schema = './tests/schema.sql'; + const schema = './tests/products.sql'; const output = datagen(`-s ${schema} -n 2 -rs 100`); expect(output).toContain('recordSizePayload'); }); diff --git a/tests/schema.sql b/tests/products.sql similarity index 100% rename from tests/schema.sql rename to tests/products.sql From 493e7c4b6a2c2b000a7bb10593c02c6a4685a20d Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Mon, 6 Mar 2023 14:32:25 -0800 Subject: [PATCH 10/17] remove unused code in sql parser --- src/schemas/parseSqlSchema.js | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/src/schemas/parseSqlSchema.js b/src/schemas/parseSqlSchema.js index c88dd16..4a300a2 100644 --- a/src/schemas/parseSqlSchema.js +++ b/src/schemas/parseSqlSchema.js @@ -1,7 +1,6 @@ const alert = require('cli-alerts'); const { Parser } = require('node-sql-parser'); const fs = require('fs'); -const { faker } = require('@faker-js/faker'); async function parseSqlSchema(schemaFile) { alert({ @@ -95,28 +94,5 @@ async function getSqlTopicName(schemaFile) { return 'datagen_test_topic'; } -function generateDataBasedOnType(column, record) { - switch (column.definition.dataType.toLowerCase()) { - case 'string': - record[column.column.column] = { column: faker.word.adjective() }; - break; - case 'int': - case 'serial': - case 'bigint': - record[column.column.column] = faker.datatype.number(); - break; - case 'text': - record[column.column.column] = faker.lorem.paragraph(); - break; - case 'timestamp': - record[column.column.column] = faker.date.past(); - break; - default: - record[column.column.column] = faker.word.adjective(); - break; - } - return record; -} - exports.parseSqlSchema = parseSqlSchema; exports.getSqlTopicName = getSqlTopicName; From 041067ddb9679b66bd7653c9d149e0df56dda619 Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Mon, 6 Mar 2023 14:57:13 -0800 Subject: [PATCH 11/17] make sql parser pick up primary key --- src/schemas/parseSqlSchema.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/schemas/parseSqlSchema.js b/src/schemas/parseSqlSchema.js index 4a300a2..f682f71 100644 --- a/src/schemas/parseSqlSchema.js +++ b/src/schemas/parseSqlSchema.js @@ -49,9 +49,8 @@ async function convertSqlSchemaToJson(tables) { } }; table.columns.forEach(column => { - if (column.constraint_type === 'primary key') { - schema._meta['key'] = column.definition[0].column; - return; + if (column.unique_or_primary === 'primary key') { + schema._meta['key'] = column.column.column; } if ( column.comment && From 93afee47a282d77f6a1faf65994ebf63d1ac8a17 Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Mon, 6 Mar 2023 15:26:45 -0800 Subject: [PATCH 12/17] work in progress ecommerce demo --- examples/ecommerce.md | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/examples/ecommerce.md b/examples/ecommerce.md index d8fffd3..a367f7b 100644 --- a/examples/ecommerce.md +++ b/examples/ecommerce.md @@ -1,7 +1,8 @@ # End-to-end Tutorial with Materialize +In this tutorial, we will generate relational data about `users`, `purchases`, and `items`. Each user has zero to many purchases, and each purchase has one to many items. At the end, we might like to retrieve a purchase history for a specific user by querying a streaming database. -Consider the example from `example-schemas/ecommerce.json`. There are `users`, `purchases`, and `items`. For each iteration of `datagen`, a user is created, then 2 purchases are created that are associated with that user, and then 3 items are created for each purchase so that the purchase's `item_id` is equal to the `id` for each item associated with it. +## Study the Input Schema Here is the input schema: @@ -57,4 +58,34 @@ Here is the input schema: "created_at": "date.recent" } ] -``` \ No newline at end of file +``` + +On each iteration of `datagen`: +1. A user is created, then +1. Two purchases are created that are associated with that user, and then +1. Three items are created for each purchase so that the purchase's `item_id` is equal to the `id` for each item associated with it. + +Here are a couple of important ideas to note: +- The file is a list of datasets +- Each dataset has a `_meta` object that specifies the key, Kafka topic, and a list of relationships + - Each relationship specifies the topic, parent field, matching child field, and how many child records should be produced for each parent record +- Each dataset has several fields, where field name is mapped to a [FakerJS API](https://fakerjs.dev/api/) method. +- The primary keys for `users` and `items` use `faker.datatype()` + - In effect, this limits the keyspace for these records. For example, user ID is specified with `datatype.number(1000)`, which means there will be a maximum of 1000 unique users, and if a new user is produced with the same ID, it will be interpreted as an update in the downstream database (more on Materialize's `UPSERT` envelope later). + +## Set up infrastructure + +This tutorial will use a Confluent Cloud Basic Kafka Cluster and Schema Registry as a target for `datagen`, and Materialize as the streaming database for queries. + +### Confluent Cloud + +1. Create an account on [Confluent Cloud](confluent.cloud). +1. Create a basic cluster in AWS us-east-1. +1. Create an API key / secret for your cluster. +1. Enable Schema Registry. +1. Create an API key / secret for Schema Registry. + +### Materialize + +1. [Register for access](https://materialize.com/register/) to Materialize. +1. Create a \ No newline at end of file From 6e33e0d035fa1174e2b3ec849476d15ed70dbe43 Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Mon, 6 Mar 2023 20:56:21 -0800 Subject: [PATCH 13/17] work on end to end example --- examples/ecommerce.json | 13 ++-- examples/ecommerce.md | 168 ++++++++++++++++++++++++++++++++++++---- 2 files changed, 158 insertions(+), 23 deletions(-) diff --git a/examples/ecommerce.json b/examples/ecommerce.json index 1b459db..baa380b 100644 --- a/examples/ecommerce.json +++ b/examples/ecommerce.json @@ -8,11 +8,11 @@ "topic": "mz_datagen_ecommerce_purchases", "parent_field": "id", "child_field": "user_id", - "records_per": 2 + "records_per": 4 } ] }, - "id": "datatype.number(1000)", + "id": "datatype.number(100)", "name": "internet.userName", "email": "internet.exampleEmail", "city": "address.city", @@ -29,14 +29,13 @@ "topic": "mz_datagen_ecommerce_items", "parent_field": "item_id", "child_field": "id", - "records_per": 3 + "records_per": 1 } ] }, - "id": "datatype.uuid", - "user_id": "datatype.number(1000)", - "item_id": "datatype.number(5000)", - "created_at": "date.recent" + "id": "datatype.number(1000)", + "user_id": "datatype.number(100)", + "item_id": "datatype.number(5000)" }, { "_meta": { diff --git a/examples/ecommerce.md b/examples/ecommerce.md index a367f7b..dc04ff3 100644 --- a/examples/ecommerce.md +++ b/examples/ecommerce.md @@ -1,6 +1,6 @@ # End-to-end Tutorial with Materialize -In this tutorial, we will generate relational data about `users`, `purchases`, and `items`. Each user has zero to many purchases, and each purchase has one to many items. At the end, we might like to retrieve a purchase history for a specific user by querying a streaming database. +In this tutorial, we will generate relational data about `users`, `purchases`, and `items`. Users make purchases, and purchases have items. At the end, we might like to retrieve a purchase history for a specific user by querying a streaming database. ## Study the Input Schema @@ -17,11 +17,11 @@ Here is the input schema: "topic": "mz_datagen_ecommerce_purchases", "parent_field": "id", "child_field": "user_id", - "records_per": 2 + "records_per": 4 } ] }, - "id": "datatype.number(1000)", + "id": "datatype.number(100)", "name": "internet.userName", "email": "internet.exampleEmail", "city": "address.city", @@ -38,14 +38,13 @@ Here is the input schema: "topic": "mz_datagen_ecommerce_items", "parent_field": "item_id", "child_field": "id", - "records_per": 3 + "records_per": 1 } ] }, - "id": "datatype.uuid", - "user_id": "datatype.number(1000)", - "item_id": "datatype.number(5000)", - "created_at": "date.recent" + "id": "datatype.number(1000)", + "user_id": "datatype.number(100)", + "item_id": "datatype.number(5000)" }, { "_meta": { @@ -62,16 +61,19 @@ Here is the input schema: On each iteration of `datagen`: 1. A user is created, then -1. Two purchases are created that are associated with that user, and then -1. Three items are created for each purchase so that the purchase's `item_id` is equal to the `id` for each item associated with it. +1. Four purchases are created that are associated with that user, and then +1. An item is created for each purchase so that the purchase's `item_id` is equal to the `id` for each item associated with it. Here are a couple of important ideas to note: - The file is a list of datasets - Each dataset has a `_meta` object that specifies the key, Kafka topic, and a list of relationships - Each relationship specifies the topic, parent field, matching child field, and how many child records should be produced for each parent record - Each dataset has several fields, where field name is mapped to a [FakerJS API](https://fakerjs.dev/api/) method. -- The primary keys for `users` and `items` use `faker.datatype()` - - In effect, this limits the keyspace for these records. For example, user ID is specified with `datatype.number(1000)`, which means there will be a maximum of 1000 unique users, and if a new user is produced with the same ID, it will be interpreted as an update in the downstream database (more on Materialize's `UPSERT` envelope later). +- The primary keys use `faker.datatype()` + - In effect, this limits the key space for these records. + - For example, user ID is specified with `datatype.number(100)`, which means there will be a maximum of 100 unique users, and if a new user is produced with the same ID, it will be interpreted as an update in the downstream database (more on Materialize's `UPSERT` envelope later). +- Since each purchase record has only one item, we will need multiple records with the same purchase ID in order to show all the different items. That means we will need to do some aggregations by purchase ID downstream (more on Materialize's `NONE` envelope, a.k.a. append-only sources later) +- Given the key spaces, it's possible for multiple users to generate different purchase records with the same purchase ID. We can interpret that to mean multiple users can participate in a purchase. ## Set up infrastructure @@ -80,12 +82,146 @@ This tutorial will use a Confluent Cloud Basic Kafka Cluster and Schema Registry ### Confluent Cloud 1. Create an account on [Confluent Cloud](confluent.cloud). -1. Create a basic cluster in AWS us-east-1. -1. Create an API key / secret for your cluster. +1. Create a basic cluster in AWS `us-east-1` or `eu-west-1`. +1. Create an API key / secret for your cluster and keep safe in a password manager. 1. Enable Schema Registry. -1. Create an API key / secret for Schema Registry. +1. Create an API key / secret for Schema Registry and keep safe in a password manager. + +### Datagen + +1. [Install datagen](../README.md#installation) if you haven't already. +1. Create a `.env` file with your Kafka and Schema Registry credentials (see [.env.example](../.env.example)). +1. Generate a single iteration of records with dry run and debug modes and check the output. + ``` + datagen \ + --schema examples/ecommerce.json \ + --format avro \ + --number 1 \ + --dry-run \ + --debug + ``` +1. Start producing data to Kafka while you set up Materialize. + ``` + datagen \ + -s examples/ecommerce.json \ + -f avro \ + -n -1 + ``` ### Materialize 1. [Register for access](https://materialize.com/register/) to Materialize. -1. Create a \ No newline at end of file +1. Enable your region. +1. In a separate terminal session, [install `psql`](https://materialize.com/docs/integrations/sql-clients/#psql). +1. Log into [Materialize](cloud.materialize.com) and create an app password. Save it in your password manager. +1. Connect to the database via `psql` with the connection string provided. +1. Create a `SECRET` called `confluent_kafka_password` that is your Kafka cluster API secret. + ```sql + CREATE SECRET confluent_kafka_password AS ''; + ``` +1. Create a `SECRET` called `csr_password` that is your Confluent Schema Registry API secret. + ```sql + CREATE SECRET csr_password AS ''; + ``` +1. Create a `KAFKA` connection called `confluent_kafka`. + ```sql + CREATE CONNECTION + confluent_kafka + TO + KAFKA ( + BROKER = 'pkc-XXXX.XXXX.aws.confluent.cloud:9092', + SASL MECHANISMS = 'PLAIN', + SASL USERNAME = '', + SASL PASSWORD = SECRET confluent_kafka_password + ); + ``` +1. Create a `CONFLUENT SCHEMA REGISTRY` connection called `csr`. + ```sql + CREATE CONNECTION + csr + TO + CONFLUENT SCHEMA REGISTRY ( + URL 'https://psrc-XXXX.XXXX.aws.confluent.cloud', + USERNAME = '', + PASSWORD = SECRET csr + ); + ``` +1. Create a cluster called `sources` to where you will run your Kafka sources. +```sql +CREATE CLUSTER + sources + REPLICAS ( + r1 (SIZE='3xsmall') + ); +``` +1. Quit your `psql` session with `Ctrl+D` or `\q` and run a small loop in your terminal to create a sources for `users` and `items`. + ```bash + for i in \ + mz_datagen_ecommerce_users \ + mz_datagen_ecommerce_purchases \ + mz_datagen_ecommerce_items; do + echo "CREATE SOURCE ${i#mz_datagen_ecommerce} + IN CLUSTER sources + FROM KAFKA CONNECTION confluent_kafka + (TOPIC '$i') + KEY FORMAT BYTES + VALUE FORMAT AVRO + USING CONFLUENT SCHEMA REGISTRY CONNECTION csr + ENVELOPE UPSERT;" | \ + psql "postgres://%40:@XXX.XXXX.aws.materialize.cloud:6875/materialize?sslmode=require" + done + ``` + > :notebook: [`UPSERT` envelope](https://materialize.com/docs/sql/create-sink/#upsert-envelope) means that Kafka records of the same key will be interpreted as inserts (key doesn't exist yet), updates (key already exists), and deletes (`null` payload, a.k.a. tombstone). +1. Connect to Materialize with `psql` again and create a source for `purchases`. + ```sql + CREATE SOURCE purchases + IN CLUSTER sources + FROM KAFKA CONNECTION confluent_kafka + (TOPIC 'mz_datagen_ecommerce_purchases') + KEY FORMAT BYTES + VALUE FORMAT AVRO + USING CONFLUENT SCHEMA REGISTRY CONNECTION csr + INCLUDE TIMESTAMP AS ts + ENVELOPE NONE; + ``` + > :notebook: A source that uses `ENVELOPE NONE` is referred to as an [append-only](https://materialize.com/docs/sql/create-source/#append-only-envelope) source. In this case, we treat all new records as inserts, even though they have the same key. In this case, a single purchase can have multiple rows corresponding to the different items in the purchase. + +## Query the Results + +Materialize specializes in efficient, incremental view maintenance over changing input data. Let's see it in action by computing purchase histories with joins and aggregations! + +1. Connect to Materialize with `psql` using your connection string and app password. +1. Create indexes on the primary keys. + ```sql + -- + ``` +1. Explore with an ad-hoc query. + ```sql + -- + ``` + > :bulb: Notice how the results are non empty! If we were generating random records, these joins would be empty because there would likely be no matches on the join conditions. +1. Create a view that calculates the purchase history for each user. + ```sql + -- + ``` +1. Create an index on that view to compute the results and load them into memory for efficient point lookups. + ```sql + -- + ``` +1. Look up the purchase history for various users. + ```sql + -- + ``` +1. Subscribe to changes in purchase history. + ```sql + -- + ``` + +## Clean up + +1. Quit your `datagen` with `Ctrl-C`. +1. Run `datagen` again with the `--clean` option + +## Learn More + +Check out the Materialize [docs](www.materialize.com/docs) and [blog](www.materialize.com/blog) for more! \ No newline at end of file From 7ee30fbfde69386798ff0a63f8a69303a8764a23 Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Mon, 6 Mar 2023 21:30:36 -0800 Subject: [PATCH 14/17] work on ecommerce tutorial --- examples/ecommerce.md | 47 ++++++++++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/examples/ecommerce.md b/examples/ecommerce.md index dc04ff3..defbef1 100644 --- a/examples/ecommerce.md +++ b/examples/ecommerce.md @@ -1,4 +1,4 @@ -# End-to-end Tutorial with Materialize +# End-to-end Ecommerce Tutorial with Materialize In this tutorial, we will generate relational data about `users`, `purchases`, and `items`. Users make purchases, and purchases have items. At the end, we might like to retrieve a purchase history for a specific user by querying a streaming database. @@ -92,7 +92,7 @@ This tutorial will use a Confluent Cloud Basic Kafka Cluster and Schema Registry 1. [Install datagen](../README.md#installation) if you haven't already. 1. Create a `.env` file with your Kafka and Schema Registry credentials (see [.env.example](../.env.example)). 1. Generate a single iteration of records with dry run and debug modes and check the output. - ``` + ```bash datagen \ --schema examples/ecommerce.json \ --format avro \ @@ -101,7 +101,7 @@ This tutorial will use a Confluent Cloud Basic Kafka Cluster and Schema Registry --debug ``` 1. Start producing data to Kafka while you set up Materialize. - ``` + ```bash datagen \ -s examples/ecommerce.json \ -f avro \ @@ -146,14 +146,22 @@ This tutorial will use a Confluent Cloud Basic Kafka Cluster and Schema Registry PASSWORD = SECRET csr ); ``` -1. Create a cluster called `sources` to where you will run your Kafka sources. -```sql -CREATE CLUSTER - sources - REPLICAS ( - r1 (SIZE='3xsmall') - ); -``` +1. Create a cluster called `sources` where you will run your Kafka sources. + ```sql + CREATE CLUSTER + sources + REPLICAS ( + r1 (SIZE='3xsmall') + ); + ``` +1. Create a cluster called `ecommerce` where you will run your queries. + ```sql + CREATE CLUSTER + ecommerce + REPLICAS ( + r1 (SIZE='2xsmall') + ); + ``` 1. Quit your `psql` session with `Ctrl+D` or `\q` and run a small loop in your terminal to create a sources for `users` and `items`. ```bash for i in \ @@ -220,7 +228,22 @@ Materialize specializes in efficient, incremental view maintenance over changing ## Clean up 1. Quit your `datagen` with `Ctrl-C`. -1. Run `datagen` again with the `--clean` option +1. Run `datagen` again with the `--clean` option to destroy topics and schema subjects. + ```bash + datagen \ + -s examples/ecommerce.json \ + -f avro \ + --clean + ``` +1. Connect again to Materialize via `psql` and drop your `sources` and `ecommerce` clusters. + ```sql + DROP CLUSTER sources CASCADE; + DROP CLUSTER ecommerce CASCADE; + ``` +1. If you haven't already, drop the cluster replica `default.r1` to avoid accruing idle charges. The default cluster will still exist, and you can create replicas for it whenever you need to compute. + ```sql + DROP CLUSTER REPLICA default.r1; + ``` ## Learn More From e3a70dc2218bbba6967df3d2600c79b8754bbec4 Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Tue, 7 Mar 2023 09:23:18 -0800 Subject: [PATCH 15/17] finish draft of end-to-end tutorial --- examples/ecommerce.json | 6 +- examples/ecommerce.md | 131 +++++++++++++++++++++++++--------------- 2 files changed, 87 insertions(+), 50 deletions(-) diff --git a/examples/ecommerce.json b/examples/ecommerce.json index baa380b..c3f6378 100644 --- a/examples/ecommerce.json +++ b/examples/ecommerce.json @@ -18,7 +18,7 @@ "city": "address.city", "state": "address.state", "zipcode": "address.zipCode", - "created_at": "date.recent" + "created_at": "date.past(5)" }, { "_meta": { @@ -45,6 +45,8 @@ "id": "datatype.number(5000)", "name": "commerce.product", "price": "commerce.price", - "created_at": "date.recent" + "description": "commerce.productDescription", + "material": "commerce.productMaterial", + "created_at": "date.past(5)" } ] \ No newline at end of file diff --git a/examples/ecommerce.md b/examples/ecommerce.md index defbef1..a842fd4 100644 --- a/examples/ecommerce.md +++ b/examples/ecommerce.md @@ -27,7 +27,7 @@ Here is the input schema: "city": "address.city", "state": "address.state", "zipcode": "address.zipCode", - "created_at": "date.recent" + "created_at": "date.past(5)" }, { "_meta": { @@ -54,7 +54,9 @@ Here is the input schema: "id": "datatype.number(5000)", "name": "commerce.product", "price": "commerce.price", - "created_at": "date.recent" + "description": "commerce.productDescription", + "material": "commerce.productMaterial", + "created_at": "date.past(5)" } ] ``` @@ -69,7 +71,7 @@ Here are a couple of important ideas to note: - Each dataset has a `_meta` object that specifies the key, Kafka topic, and a list of relationships - Each relationship specifies the topic, parent field, matching child field, and how many child records should be produced for each parent record - Each dataset has several fields, where field name is mapped to a [FakerJS API](https://fakerjs.dev/api/) method. -- The primary keys use `faker.datatype()` +- The primary keys happen to use `faker.datatype()` - In effect, this limits the key space for these records. - For example, user ID is specified with `datatype.number(100)`, which means there will be a maximum of 100 unique users, and if a new user is produced with the same ID, it will be interpreted as an update in the downstream database (more on Materialize's `UPSERT` envelope later). - Since each purchase record has only one item, we will need multiple records with the same purchase ID in order to show all the different items. That means we will need to do some aggregations by purchase ID downstream (more on Materialize's `NONE` envelope, a.k.a. append-only sources later) @@ -105,7 +107,8 @@ This tutorial will use a Confluent Cloud Basic Kafka Cluster and Schema Registry datagen \ -s examples/ecommerce.json \ -f avro \ - -n -1 + -n -1 \ + --wait 500 ``` ### Materialize @@ -162,35 +165,38 @@ This tutorial will use a Confluent Cloud Basic Kafka Cluster and Schema Registry r1 (SIZE='2xsmall') ); ``` -1. Quit your `psql` session with `Ctrl+D` or `\q` and run a small loop in your terminal to create a sources for `users` and `items`. - ```bash - for i in \ - mz_datagen_ecommerce_users \ - mz_datagen_ecommerce_purchases \ - mz_datagen_ecommerce_items; do - echo "CREATE SOURCE ${i#mz_datagen_ecommerce} - IN CLUSTER sources - FROM KAFKA CONNECTION confluent_kafka - (TOPIC '$i') - KEY FORMAT BYTES - VALUE FORMAT AVRO - USING CONFLUENT SCHEMA REGISTRY CONNECTION csr - ENVELOPE UPSERT;" | \ - psql "postgres://%40:@XXX.XXXX.aws.materialize.cloud:6875/materialize?sslmode=require" - done - ``` - > :notebook: [`UPSERT` envelope](https://materialize.com/docs/sql/create-sink/#upsert-envelope) means that Kafka records of the same key will be interpreted as inserts (key doesn't exist yet), updates (key already exists), and deletes (`null` payload, a.k.a. tombstone). -1. Connect to Materialize with `psql` again and create a source for `purchases`. +1. Create `UPSERT` sources for `users` and `items`. + ```sql + CREATE SOURCE users + IN CLUSTER sources + FROM KAFKA CONNECTION confluent_kafka + (TOPIC 'mz_datagen_ecommerce_users') + KEY FORMAT BYTES + VALUE FORMAT AVRO + USING CONFLUENT SCHEMA REGISTRY CONNECTION csr + ENVELOPE UPSERT; + ``` + ```sql + CREATE SOURCE items + IN CLUSTER sources + FROM KAFKA CONNECTION confluent_kafka + (TOPIC 'mz_datagen_ecommerce_items') + KEY FORMAT BYTES + VALUE FORMAT AVRO + USING CONFLUENT SCHEMA REGISTRY CONNECTION csr + ENVELOPE UPSERT; + ``` + > :notebook: [`UPSERT` envelope](https://materialize.com/docs/sql/create-sink/#upsert-envelope) means that Kafka records of the same key will be interpreted as inserts (key doesn't exist yet), updates (key already exists), or deletes (`null` payload, a.k.a. tombstone). +1. Create an append-only (`ENVELOPE NONE`) source for `purchases`. ```sql CREATE SOURCE purchases - IN CLUSTER sources - FROM KAFKA CONNECTION confluent_kafka - (TOPIC 'mz_datagen_ecommerce_purchases') - KEY FORMAT BYTES - VALUE FORMAT AVRO - USING CONFLUENT SCHEMA REGISTRY CONNECTION csr - INCLUDE TIMESTAMP AS ts - ENVELOPE NONE; + IN CLUSTER sources + FROM KAFKA CONNECTION confluent_kafka + (TOPIC 'mz_datagen_ecommerce_purchases') + KEY FORMAT BYTES + VALUE FORMAT AVRO + USING CONFLUENT SCHEMA REGISTRY CONNECTION csr + ENVELOPE NONE; ``` > :notebook: A source that uses `ENVELOPE NONE` is referred to as an [append-only](https://materialize.com/docs/sql/create-source/#append-only-envelope) source. In this case, we treat all new records as inserts, even though they have the same key. In this case, a single purchase can have multiple rows corresponding to the different items in the purchase. @@ -199,42 +205,64 @@ This tutorial will use a Confluent Cloud Basic Kafka Cluster and Schema Registry Materialize specializes in efficient, incremental view maintenance over changing input data. Let's see it in action by computing purchase histories with joins and aggregations! 1. Connect to Materialize with `psql` using your connection string and app password. -1. Create indexes on the primary keys. +1. Use the `ecommerce` cluster. ```sql - -- + SET CLUSTER = ecommerce; ``` -1. Explore with an ad-hoc query. +1. Create a bunch of indexes. ```sql - -- + CREATE INDEX "users_idx" ON "users" ("id"); + CREATE INDEX "items_idx" ON "items" ("id"); + CREATE INDEX "purchases_idx_items" ON "purchases" ("item_id"); + CREATE INDEX "purchases_idx_users" ON "purchases" ("user_id"); ``` - > :bulb: Notice how the results are non empty! If we were generating random records, these joins would be empty because there would likely be no matches on the join conditions. 1. Create a view that calculates the purchase history for each user. ```sql - -- + CREATE VIEW + "purchases_agg" + AS + SELECT + "p"."id", + "list_agg"("i"."id") AS "item_ids", + "list_agg"("u"."id") AS "user_ids", + "sum"("i"."price"::"numeric") AS "total" + FROM + "purchases" AS "p" + JOIN + "items" AS "i" + ON "p"."item_id" = "i"."id" + JOIN + "users" AS "u" + ON "p"."user_id" = "u"."id" + GROUP BY + "p"."id"; ``` 1. Create an index on that view to compute the results and load them into memory for efficient point lookups. ```sql - -- + CREATE INDEX "purchases_agg_idx_user_ids" ON "purchases_agg" ("user_ids"); ``` 1. Look up the purchase history for various users. ```sql - -- + SELECT * FROM "purchases_agg" WHERE 70 = ANY ("user_ids"); ``` -1. Subscribe to changes in purchase history. + > :bulb: Notice how the results are non empty! If we were generating random records, these joins would be empty because there would likely be no matches on the join conditions. +1. Subscribe to changes in purchase history for a particular user in near-real time. ```sql - -- + COPY + (SUBSCRIBE + (SELECT + * + FROM + "purchases_agg" + WHERE + 4 = ANY ("user_ids"))) + TO + STDOUT; ``` ## Clean up 1. Quit your `datagen` with `Ctrl-C`. -1. Run `datagen` again with the `--clean` option to destroy topics and schema subjects. - ```bash - datagen \ - -s examples/ecommerce.json \ - -f avro \ - --clean - ``` 1. Connect again to Materialize via `psql` and drop your `sources` and `ecommerce` clusters. ```sql DROP CLUSTER sources CASCADE; @@ -244,6 +272,13 @@ Materialize specializes in efficient, incremental view maintenance over changing ```sql DROP CLUSTER REPLICA default.r1; ``` +1. Run `datagen` again with the `--clean` option to destroy topics and schema subjects. + ```bash + datagen \ + -s examples/ecommerce.json \ + -f avro \ + --clean + ``` ## Learn More From b6df0c92c974ee3b55e4b6cc6f0a73e98fcf1d2c Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Tue, 7 Mar 2023 09:23:43 -0800 Subject: [PATCH 16/17] small bugfix for producer --- src/dataGenerator.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/dataGenerator.js b/src/dataGenerator.js index 640e53e..1abf0b2 100644 --- a/src/dataGenerator.js +++ b/src/dataGenerator.js @@ -102,8 +102,9 @@ module.exports = async ({ let registry; let avroSchemas = {}; + let producer; if(dryRun !== true){ - const producer = await connectKafkaProducer(); + producer = await connectKafkaProducer(); } for await (const iteration of asyncGenerator(number)) { global.iterationIndex = iteration; From 3d3e8daf103e303672720963503a27999563a224 Mon Sep 17 00:00:00 2001 From: chuck-alt-delete Date: Tue, 7 Mar 2023 09:25:06 -0800 Subject: [PATCH 17/17] add new --prefix option to readme --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 49e0438..2ea4722 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,7 @@ Options: -d, --debug Output extra debugging information -w, --wait Wait time in ms between record production -rs, --record-size Record size in bytes, eg. 1048576 for 1MB + -p, --prefix Kafka topic and schema registry prefix -h, --help display help for command ```