diff --git a/integrations/debezium/README.md b/integrations/debezium/README.md new file mode 100644 index 0000000..34e49b8 --- /dev/null +++ b/integrations/debezium/README.md @@ -0,0 +1,42 @@ +# Materialize + Debezium Examples + +## Overview + +This is a collection of demos that show how to use the [Debezium](https://materialize.com/docs/ingest-data/debezium/) connector with Materialize. + +## Demos + +| Demo | Description | Materialize Docs | +| ---------------------------------- | --------------------------------------------------------------------------- | ------------------------------------------------------------------------------------ | +| [Postgres](postgres) | Connect to a Postgres database and stream changes to Kafka/Redpanda | [Postgres](https://materialize.com/docs/ingest-data/cdc-postgres-kafka-debezium/) | +| [MySQL](mysql) | Connect to a MySQL database and stream changes to Kafka/Redpanda | [MySQL](https://materialize.com/docs/ingest-data/cdc-mysql/) | +| [SQL server](sqlserver) | Connect to a SQL server database and stream changes to Kafka/Redpanda | TODO | +| [MongoDB](mongodb) | Connect to a MongoDB database and stream changes to Kafka/Redpanda | WIP: This demo is not yet complete. | + +## Prerequisites + +- [Docker](https://docs.docker.com/get-docker/) +- [Docker Compose](https://docs.docker.com/compose/install/) +- [Materialize](https://console.materialize.com/) account + +## Running the demos + +For each demo, follow the instructions in the demo's README. All demos assume that you have `psql`, `docker` and a publicly accessible Linux environment. + +## Notes + +Beginning with Debezium 2.0.0, Confluent Schema Registry support is not included in the Debezium containers. To enable the Confluent Schema Registry for a Debezium container, install the following Confluent Avro converter JAR files into the Connect plugin directory: + +- `kafka-connect-avro-converter` + +- `kafka-connect-avro-data` + +- `kafka-avro-serializer` + +- `kafka-schema-serializer` + +- `kafka-schema-registry-client` + +- `common-config` + +- `common-utils` diff --git a/integrations/debezium/mongodb/README.md b/integrations/debezium/mongodb/README.md new file mode 100644 index 0000000..bd3e8ce --- /dev/null +++ b/integrations/debezium/mongodb/README.md @@ -0,0 +1,180 @@ +# Debezium + MongoDB + Materialize + +> {notice}: WIP: This demo is a work in progress. It is not yet ready for use. + +- When trying to create a source with envelope debezium, the following error is thrown: + + ```sql + ERROR: 'before' column must be of type record + ``` + +An example of the records generated: + +- Insert: +```sql +-[ RECORD 1 ]-----+------------------------------------------------------------------------------------------------------------------ +id | 102 +before | +after | {"_id":{"$numberLong":"102"},"description":"12V car battery","name":"car battery","quantity":8,"weight":8.1} +updateDescription | +source | (2.4.0.Final,mongodb,dbserver1,0,true,inventory,,rs0,products,-1,,,) +op | r +ts_ms | 1698676832093 +transaction | +``` + +- Update: +```sql +-[ RECORD 2 ]-----+------------------------------------------------------------------------------------------------------------------ +id | 101 +before | {"_id":{"$numberLong":"101"},"description":"Small 2-wheel scooter","name":"scooter","quantity":3,"weight":3.14} +after | {"_id":{"$numberLong":"101"},"description":"Updated 2-wheel scooter","name":"scooter","quantity":3,"weight":3.14} +updateDescription | (,"{""description"":""Updated 2-wheel scooter""}",) +source | (2.4.0.Final,mongodb,dbserver1,1698676924000,false,inventory,,rs0,products,1,,,1698676924686) +op | u +ts_ms | 1698676924707 +transaction | + ``` + +--- + +Before trying this out, you will need the following: + +- [Materialize account](https://materialize.com/register/). +- A publicly accessible Linux server with [Docker](https://docs.docker.com/get-docker/) and [Docker Compose](https://docs.docker.com/compose/install/) installed. + +## Running the demo + +If you want to try it right now, follow these steps: + +1. Clone the project on your Linux server and run: + + ```shell session + git clone https://github.com/MaterializeInc/demos.git + cd demos/integrations/debezium/mongodb + ``` + +1. After cloning the project, you will need to set the `EXTERNAL_IP` environment variable to the IP address of your Linux server. For example: + + ```shell session + export EXTERNAL_IP=$(hostname -I | awk '{print $1}') + + # Check the value of EXTERNAL_IP + echo $EXTERNAL_IP + ``` + +1. Bring up only the MongoDB container in the background. + + ```shell session + docker compose up -d --build mongodb + ``` + +1. Initialize the MongoDB replica set. + + ```shell session + docker compose exec mongodb bash -c "/usr/local/bin/init-inventory.sh" + ``` + +1. Bring up the rest of the Docker containers in the background. + + ```shell session + docker compose up -d --build + ``` + + **This may take one or two minutes to complete the first time you run it.** If all goes well, you'll have everything running in their own containers, with Debezium configured to ship changes from Mongo into Redpanda. + +1. Confirm that everything is running as expected: + + ```shell session + docker compose ps + ``` + +1. Exec in to the redpanda container to look around using redpanda's amazing [rpk](https://docs.redpanda.com/docs/reference/rpk/) CLI. + + ```shell session + docker compose exec redpanda /bin/bash + + rpk debug info + + rpk topic list + ``` +1. Connect to Materialize + +If you already have `psql` installed on your machine, use the provided connection string to connect: + +Example: + + ```shell session + psql "postgres://user%40domain.com@materialize_host:6875/materialize" + ``` + +Otherwise, you can find the steps to install and use your CLI of choice under [Supported tools](https://materialize.com/docs/integrations/sql-clients/#supported-tools). + +1. Now that you're in the Materialize, define the connection to the Redpanda broker and the schema registry: + + ```sql + -- Create Redpanda connection + CREATE CONNECTION redpanda_connection + TO KAFKA ( + BROKER ''); + + -- Create Registry connection + CREATE CONNECTION schema_registry + TO CONFLUENT SCHEMA REGISTRY ( + URL 'http://'); + ``` + +1. Next, define all of the tables in `demo` as sources: + + ```sql + CREATE SOURCE customers + FROM KAFKA CONNECTION redpanda_connection (TOPIC 'dbserver1.inventory.customers') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry + ENVELOPE DEBEZIUM + WITH (SIZE = '3xsmall'); + + CREATE SOURCE orders + FROM KAFKA CONNECTION redpanda_connection (TOPIC 'dbserver1.inventory.orders') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry + ENVELOPE DEBEZIUM + WITH (SIZE = '3xsmall'); + + CREATE SOURCE products + FROM KAFKA CONNECTION redpanda_connection (TOPIC 'dbserver1.inventory.products') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry + ENVELOPE DEBEZIUM + WITH (SIZE = '3xsmall'); + ``` + + Because the three sources are pulling message schema data from the registry, materialize knows the column types to use for each attribute. + + +1. Select from one of sources to see the data: + + ```sql + SELECT * FROM customers LIMIT 5; + ``` + +## Cleanup + +To stop the services and remove the containers, run: + +```shell session +docker compose down +``` + +In Materialize, run: + +```sql +DROP CONNECTION redpanda_connection CASCADE; +DROP CONNECTION schema_registry CASCADE; +``` + +## Helpful resources: + +* [`CREATE SOURCE`](https://materialize.com/docs/sql/create-source) +* [`CREATE MATERIALIZED VIEW`](https://materialize.com/docs/sql/create-materialized-view) + +## Community + +If you have any questions or comments, please join the [Materialize Slack Community](https://materialize.com/s/chat)! diff --git a/integrations/debezium/mongodb/connect/Dockerfile b/integrations/debezium/mongodb/connect/Dockerfile new file mode 100644 index 0000000..e96f8b5 --- /dev/null +++ b/integrations/debezium/mongodb/connect/Dockerfile @@ -0,0 +1,49 @@ +FROM debezium/connect-base:2.4 + +# +# Set up the plugins directory ... +# +ENV CONFLUENT_VERSION=7.0.1 \ + AVRO_VERSION=1.10.1 \ + GUAVA_VERSION=31.0.1-jre + +RUN docker-maven-download confluent kafka-connect-avro-converter "$CONFLUENT_VERSION" fd03a1436f29d39e1807e2fb6f8e415a && \ + docker-maven-download confluent kafka-connect-avro-data "$CONFLUENT_VERSION" d27f30e9eca4ef1129289c626e9ce1f1 && \ + docker-maven-download confluent kafka-avro-serializer "$CONFLUENT_VERSION" c72420603422ef54d61f493ca338187c && \ + docker-maven-download confluent kafka-schema-serializer "$CONFLUENT_VERSION" 9c510db58119ef66d692ae172d5b1204 && \ + docker-maven-download confluent kafka-schema-registry-client "$CONFLUENT_VERSION" 7449df1f5c9a51c3e82e776eb7814bf1 && \ + docker-maven-download confluent common-config "$CONFLUENT_VERSION" aab5670de446af5b6f10710e2eb86894 && \ + docker-maven-download confluent common-utils "$CONFLUENT_VERSION" 74bf5cc6de2748148f5770bccd83a37c && \ + docker-maven-download central org/apache/avro avro "$AVRO_VERSION" 35469fee6d74ecbadce4773bfe3a204c && \ + docker-maven-download central com/google/guava guava "$GUAVA_VERSION" bb811ca86cba6506cca5d415cd5559a7 + +# https://github.com/debezium/container-images/blob/main/connect/2.4/Dockerfile +LABEL maintainer="Debezium Community" + +ENV DEBEZIUM_VERSION="2.4.0.Final" \ + MAVEN_REPO_CENTRAL="" \ + MAVEN_REPOS_ADDITIONAL="" \ + MAVEN_DEP_DESTINATION=$KAFKA_CONNECT_PLUGINS_DIR \ + MONGODB_MD5=a22784387e0ec8a6abb1606c2c365cb2 \ + MYSQL_MD5=4bff262afc9678f5cbc3be6315b8e71e \ + POSTGRES_MD5=b42c9e208410f39ad1ad09778b1e3f03 \ + SQLSERVER_MD5=9b8bf3c62a7c22c465a32fa27b3cffb5 \ + ORACLE_MD5=21699814400860457dc2334b165882e6 \ + DB2_MD5=0727d7f2d1deeacef39e230acac835a8 \ + SPANNER_MD5=186b07595e914e9139941889fd675044 \ + VITESS_MD5=3b4d24c8c9898df060c408a13fd3429f \ + JDBC_MD5=77c5cb9adf932ab17c041544f4ade357 \ + KCRESTEXT_MD5=25c0353f5a7304b3c4780a20f0f5d0af \ + SCRIPTING_MD5=53a3661e7a9877744f4a30d6483d7957 + +RUN docker-maven-download debezium mongodb "$DEBEZIUM_VERSION" "$MONGODB_MD5" && \ + docker-maven-download debezium mysql "$DEBEZIUM_VERSION" "$MYSQL_MD5" && \ + docker-maven-download debezium postgres "$DEBEZIUM_VERSION" "$POSTGRES_MD5" && \ + docker-maven-download debezium sqlserver "$DEBEZIUM_VERSION" "$SQLSERVER_MD5" && \ + docker-maven-download debezium oracle "$DEBEZIUM_VERSION" "$ORACLE_MD5" && \ + docker-maven-download debezium-additional db2 db2 "$DEBEZIUM_VERSION" "$DB2_MD5" && \ + docker-maven-download debezium-additional jdbc jdbc "$DEBEZIUM_VERSION" "$JDBC_MD5" && \ + docker-maven-download debezium-additional spanner spanner "$DEBEZIUM_VERSION" "$SPANNER_MD5" && \ + docker-maven-download debezium-additional vitess vitess "$DEBEZIUM_VERSION" "$VITESS_MD5" && \ + docker-maven-download debezium-optional connect-rest-extension "$DEBEZIUM_VERSION" "$KCRESTEXT_MD5" && \ + docker-maven-download debezium-optional scripting "$DEBEZIUM_VERSION" "$SCRIPTING_MD5" diff --git a/integrations/debezium/mongodb/deploy/Dockerfile b/integrations/debezium/mongodb/deploy/Dockerfile new file mode 100644 index 0000000..4c0db5a --- /dev/null +++ b/integrations/debezium/mongodb/deploy/Dockerfile @@ -0,0 +1,10 @@ +FROM ubuntu:latest + +RUN apt-get update && apt-get -qy install curl + +COPY . /deploy + +COPY docker-entrypoint.sh /usr/local/bin +RUN chmod 777 /usr/local/bin/docker-entrypoint.sh + +ENTRYPOINT ["docker-entrypoint.sh"] diff --git a/integrations/debezium/mongodb/deploy/docker-entrypoint.sh b/integrations/debezium/mongodb/deploy/docker-entrypoint.sh new file mode 100644 index 0000000..f63ba9d --- /dev/null +++ b/integrations/debezium/mongodb/deploy/docker-entrypoint.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +set -euo pipefail + +cd /deploy + +bash mongo_dbz.sh diff --git a/integrations/debezium/mongodb/deploy/mongo_dbz.sh b/integrations/debezium/mongodb/deploy/mongo_dbz.sh new file mode 100755 index 0000000..c334bcc --- /dev/null +++ b/integrations/debezium/mongodb/deploy/mongo_dbz.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +#Initialize Debezium (Kafka Connect Component) + +while true; do + echo "Waiting for Debezium to be ready" + sleep 0.1 + curl -s -o /dev/null -w "%{http_code}" http://debezium:8083/connectors/ | grep 200 + if [ $? -eq 0 ]; then + echo "Debezium is ready" + break + fi +done + +# Read the JSON file and register the connector and change the ${EXTERNAL_IP} with the external IP environment variable +sed -i "s/EXTERNAL_IP/${EXTERNAL_IP}/g" /deploy/register-mongodb.json + +curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://debezium:8083/connectors/ -d @/deploy/register-mongodb.json + +if [ $? -eq 0 ]; then + echo "Debezium connector registered" +else + echo "Debezium connector registration failed" + exit 1 +fi diff --git a/integrations/debezium/mongodb/deploy/register-mongodb.json b/integrations/debezium/mongodb/deploy/register-mongodb.json new file mode 100644 index 0000000..bd8f4e9 --- /dev/null +++ b/integrations/debezium/mongodb/deploy/register-mongodb.json @@ -0,0 +1,18 @@ +{ + "name": "inventory-connector", + "config": { + "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector", + "tasks.max" : "1", + "topic.prefix" : "dbserver1", + "mongodb.user" : "debezium", + "mongodb.password" : "dbz", + "mongodb.connection.string": "mongodb://mongodb:27017/?replicaSet=rs0", + "capture.mode": "change_streams_update_full_with_pre_image", + "database.include.list" : "inventory", + "database.history.kafka.bootstrap.servers": "138.68.72.112:9092", + "key.converter": "io.confluent.connect.avro.AvroConverter", + "key.converter.schema.registry.url": "http://138.68.72.112:8081", + "value.converter": "io.confluent.connect.avro.AvroConverter", + "value.converter.schema.registry.url": "http://138.68.72.112:8081" + } +} diff --git a/integrations/debezium/mongodb/docker-compose.yaml b/integrations/debezium/mongodb/docker-compose.yaml new file mode 100644 index 0000000..f8de066 --- /dev/null +++ b/integrations/debezium/mongodb/docker-compose.yaml @@ -0,0 +1,66 @@ +version: '3.9' +services: + redpanda: + image: docker.vectorized.io/vectorized/redpanda:v21.10.1 + container_name: redpanda + command: + - redpanda start + - --overprovisioned + - --smp 1 + - --memory 1G + - --reserve-memory 0M + - --node-id 0 + - --check=false + - --kafka-addr 0.0.0.0:9092 + - --advertise-kafka-addr ${EXTERNAL_IP:-redpanda}:9092 + - --pandaproxy-addr 0.0.0.0:8082 + - --advertise-pandaproxy-addr ${EXTERNAL_IP:-redpanda}:9092 + - --set redpanda.enable_transactions=true + - --set redpanda.enable_idempotence=true + ports: + - 9092:9092 + - 8081:8081 + - 8082:8082 + healthcheck: {test: curl -f localhost:9644/v1/status/ready, interval: 1s, start_period: 30s} + mongodb: + # image: quay.io/debezium/example-mongodb:2.4 + build: ./mongodb + hostname: mongodb + ports: + - 27017:27017 + environment: + - MONGODB_USER=debezium + - MONGODB_PASSWORD=dbz + # volumes: + # - ./mongodb/mongo-init.js:/docker-entrypoint-initdb.d/mongo-init.js:ro + healthcheck: + test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"] + interval: 1s + timeout: 10s + retries: 5 + debezium: + #image: debezium/connect:2.4 + build: ./connect + container_name: debezium + environment: + BOOTSTRAP_SERVERS: ${EXTERNAL_IP:-redpanda}:9092 + GROUP_ID: 1 + CONFIG_STORAGE_TOPIC: connect_configs + OFFSET_STORAGE_TOPIC: connect_offsets + STATUS_STORAGE_TOPIC: connect_status + CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://${EXTERNAL_IP:-redpanda}:8081 + CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://${EXTERNAL_IP:-redpanda}:8081 + ports: + - 8083:8083 + healthcheck: {test: curl -f localhost:8083, interval: 1s, start_period: 120s} + depends_on: + redpanda: {condition: service_healthy} + mongodb: {condition: service_healthy} + debezium_deploy: + build: ./deploy + depends_on: + redpanda: {condition: service_healthy} + mongodb: {condition: service_healthy} + debezium: {condition: service_healthy} + environment: + - EXTERNAL_IP=${EXTERNAL_IP:-redpanda} diff --git a/integrations/debezium/mongodb/mongodb/Dockerfile b/integrations/debezium/mongodb/mongodb/Dockerfile new file mode 100644 index 0000000..97ad777 --- /dev/null +++ b/integrations/debezium/mongodb/mongodb/Dockerfile @@ -0,0 +1,14 @@ +FROM mongo:6.0.11 + +LABEL maintainer="Debezium Community" + +COPY init-inventory.sh /usr/local/bin/ +RUN chmod +x /usr/local/bin/init-inventory.sh + +# Starting with MongoDB 4.4 the authentication enabled MongoDB requires a key +# for intra-replica set communication +RUN openssl rand -base64 756 > /etc/mongodb.keyfile &&\ + chown mongodb /etc/mongodb.keyfile &&\ + chmod 400 /etc/mongodb.keyfile + +CMD ["mongod", "--replSet", "rs0", "--auth", "--keyFile", "/etc/mongodb.keyfile"] diff --git a/integrations/debezium/mongodb/mongodb/init-inventory.sh b/integrations/debezium/mongodb/mongodb/init-inventory.sh new file mode 100644 index 0000000..f54a86b --- /dev/null +++ b/integrations/debezium/mongodb/mongodb/init-inventory.sh @@ -0,0 +1,112 @@ +HOSTNAME=`hostname` + + OPTS=`getopt -o h: --long hostname: -n 'parse-options' -- "$@"` + if [ $? != 0 ] ; then echo "Failed parsing options." >&2 ; exit 1 ; fi + + echo "$OPTS" + eval set -- "$OPTS" + + while true; do + case "$1" in + -h | --hostname ) HOSTNAME=$2; shift; shift ;; + -- ) shift; break ;; + * ) break ;; + esac + done +echo "Using HOSTNAME='$HOSTNAME'" + +mongosh localhost:27017/inventory <<-EOF + rs.initiate({ + _id: "rs0", + members: [ { _id: 0, host: "${HOSTNAME}:27017" } ] + }); +EOF +echo "Initiated replica set" + +sleep 3 +mongosh localhost:27017/admin <<-EOF + db.createUser({ user: 'admin', pwd: 'admin', roles: [ { role: "userAdminAnyDatabase", db: "admin" } ] }); +EOF + +mongosh -u admin -p admin localhost:27017/admin <<-EOF + db.runCommand({ + createRole: "listDatabases", + privileges: [ + { resource: { cluster : true }, actions: ["listDatabases"]} + ], + roles: [] + }); + + db.runCommand({ + createRole: "readChangeStream", + privileges: [ + { resource: { db: "", collection: ""}, actions: [ "find", "changeStream" ] } + ], + roles: [] + }); + + db.createUser({ + user: 'debezium', + pwd: 'dbz', + roles: [ + { role: "readWrite", db: "inventory" }, + { role: "read", db: "local" }, + { role: "listDatabases", db: "admin" }, + { role: "readChangeStream", db: "admin" }, + { role: "read", db: "config" }, + { role: "read", db: "admin" } + ] + }); +EOF + +mongosh -u admin -p admin localhost:27017/admin --authenticationDatabase admin <<-EOF + db.grantRolesToUser("admin", [{ role: "readWrite", db: "inventory" }]); +EOF + +mongosh -u admin -p admin localhost:27017/inventory --authenticationDatabase admin <<-EOF + db.createCollection("products", { + changeStreamPreAndPostImages: { enabled: true } + }); + + db.createCollection("customers", { + changeStreamPreAndPostImages: { enabled: true } + }); + + db.createCollection("orders", { + changeStreamPreAndPostImages: { enabled: true } + }); +EOF + +echo "Created users" + +mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory <<-EOF + use inventory; + + db.products.insert([ + { _id : NumberLong("101"), name : 'scooter', description: 'Small 2-wheel scooter', weight : 3.14, quantity : NumberInt("3") }, + { _id : NumberLong("102"), name : 'car battery', description: '12V car battery', weight : 8.1, quantity : NumberInt("8") }, + { _id : NumberLong("103"), name : '12-pack drill bits', description: '12-pack of drill bits with sizes ranging from #40 to #3', weight : 0.8, quantity : NumberInt("18") }, + { _id : NumberLong("104"), name : 'hammer', description: "12oz carpenter's hammer", weight : 0.75, quantity : NumberInt("4") }, + { _id : NumberLong("105"), name : 'hammer', description: "14oz carpenter's hammer", weight : 0.875, quantity : NumberInt("5") }, + { _id : NumberLong("106"), name : 'hammer', description: "16oz carpenter's hammer", weight : 1.0, quantity : NumberInt("0") }, + { _id : NumberLong("107"), name : 'rocks', description: 'box of assorted rocks', weight : 5.3, quantity : NumberInt("44") }, + { _id : NumberLong("108"), name : 'jacket', description: 'water resistent black wind breaker', weight : 0.1, quantity : NumberInt("2") }, + { _id : NumberLong("109"), name : 'spare tire', description: '24 inch spare tire', weight : 22.2, quantity : NumberInt("5") } + ]); + + db.customers.insert([ + { _id : NumberLong("1001"), first_name : 'Sally', last_name : 'Thomas', email : 'sally.thomas@acme.com' }, + { _id : NumberLong("1002"), first_name : 'George', last_name : 'Bailey', email : 'gbailey@foobar.com' }, + { _id : NumberLong("1003"), first_name : 'Edward', last_name : 'Walker', email : 'ed@walker.com' }, + { _id : NumberLong("1004"), first_name : 'Anne', last_name : 'Kretchmar', email : 'annek@noanswer.org' } + ]); + + db.orders.insert([ + { _id : NumberLong("10001"), order_date : new ISODate("2016-01-16T00:00:00Z"), purchaser_id : NumberLong("1001"), quantity : NumberInt("1"), product_id : NumberLong("102") }, + { _id : NumberLong("10002"), order_date : new ISODate("2016-01-17T00:00:00Z"), purchaser_id : NumberLong("1002"), quantity : NumberInt("2"), product_id : NumberLong("105") }, + { _id : NumberLong("10003"), order_date : new ISODate("2016-02-19T00:00:00Z"), purchaser_id : NumberLong("1002"), quantity : NumberInt("2"), product_id : NumberLong("106") }, + { _id : NumberLong("10004"), order_date : new ISODate("2016-02-21T00:00:00Z"), purchaser_id : NumberLong("1003"), quantity : NumberInt("1"), product_id : NumberLong("107") } + ]); +EOF + +echo "Inserted example data" diff --git a/integrations/debezium/mongodb/mongodb/mongo-init.js b/integrations/debezium/mongodb/mongodb/mongo-init.js new file mode 100644 index 0000000..ab12966 --- /dev/null +++ b/integrations/debezium/mongodb/mongodb/mongo-init.js @@ -0,0 +1,51 @@ +// Initialize Replica Set +rs.initiate({ + _id: "rs0", + members: [{ _id: 0, host: "localhost:27017" }] +}); + +print("Initiated replica set"); + +// Sleep is not available in this context, so we rely on MongoDB to wait until the RS is initiated +// before proceeding with the rest of the script + +// Create admin user +db = db.getSiblingDB('admin'); +db.createUser({ + user: 'admin', + pwd: 'admin', + roles: [{ role: "userAdminAnyDatabase", db: "admin" }] +}); + +// Define new roles +db.createRole({ + role: "listDatabases", + privileges: [{ + resource: { cluster: true }, + actions: ["listDatabases"] + }], + roles: [] +}); + +db.createRole({ + role: "readChangeStream", + privileges: [{ + resource: { db: "", collection: "" }, + actions: ["find", "changeStream"] + }], + roles: [] +}); + +// Create debezium user +db.createUser({ + user: 'debezium', + pwd: 'dbz', + roles: [ + { role: "readWrite", db: "inventory" }, + { role: "read", db: "local" }, + { role: "listDatabases", db: "admin" }, + { role: "readChangeStream", db: "admin" }, + { role: "read", db: "config" }, + { role: "read", db: "admin" } + ] +}); diff --git a/integrations/debezium/mysql/.env.example b/integrations/debezium/mysql/.env.example new file mode 100644 index 0000000..e69de29 diff --git a/integrations/debezium/mysql/README.md b/integrations/debezium/mysql/README.md new file mode 100644 index 0000000..eef9585 --- /dev/null +++ b/integrations/debezium/mysql/README.md @@ -0,0 +1,179 @@ +# Debezium + MySQL + Materialize + +Before trying this out, you will need the following: + +- [Materialize account](https://materialize.com/register/). +- A publicly accessible Linux server with [Docker](https://docs.docker.com/get-docker/) and [Docker Compose](https://docs.docker.com/compose/install/) installed. + +## Running the demo + +If you want to try it right now, follow these steps: + +1. Clone the project on your Linux server and run: + + ```shell session + git clone https://github.com/MaterializeInc/demos.git + cd demos/integrations/debezium/mysql + ``` + +1. After cloning the project, you will need to set the `EXTERNAL_IP` environment variable to the IP address of your Linux server. For example: + + ```shell session + export EXTERNAL_IP=$(hostname -I | awk '{print $1}') + + # Check the value of EXTERNAL_IP + echo $EXTERNAL_IP + ``` + +1. Bring up the Docker Compose containers in the background. + + ```shell session + docker compose up -d --build + ``` + + **This may take one or two minutes to complete the first time you run it.** If all goes well, you'll have everything running in their own containers, with Debezium configured to ship changes from MySQL into Redpanda. + +1. Confirm that everything is running as expected: + + ```shell session + docker compose ps + ``` + +1. Exec in to the redpanda container to look around using redpanda's amazing [rpk](https://docs.redpanda.com/docs/reference/rpk/) CLI. + + ```shell session + docker compose exec redpanda /bin/bash + + rpk debug info + + rpk topic list + ``` +1. Connect to Materialize + +If you already have `psql` installed on your machine, use the provided connection string to connect: + +Example: + + ```shell session + psql "postgres://user%40domain.com@materialize_host:6875/materialize" + ``` + +Otherwise, you can find the steps to install and use your CLI of choice under [Supported tools](https://materialize.com/docs/integrations/sql-clients/#supported-tools). + +1. Now that you're in Materialize, define the connection to the Redpanda broker and the schema registry: + + ```sql + -- Create Redpanda connection + CREATE CONNECTION redpanda_connection + TO KAFKA ( + BROKER ''); + + -- Create Registry connection + CREATE CONNECTION schema_registry + TO CONFLUENT SCHEMA REGISTRY ( + URL 'http://'); + ``` + +1. Next, define all of the tables in `demo` as sources: + + ```sql + CREATE SOURCE users + FROM KAFKA CONNECTION redpanda_connection (TOPIC 'mysql_repl.demo.users') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry + ENVELOPE DEBEZIUM + WITH (SIZE = '3xsmall'); + + CREATE SOURCE roles + FROM KAFKA CONNECTION redpanda_connection (TOPIC 'mysql_repl.demo.roles') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry + ENVELOPE DEBEZIUM + WITH (SIZE = '3xsmall'); + + CREATE SOURCE reviews + FROM KAFKA CONNECTION redpanda_connection (TOPIC 'mysql_repl.demo.reviews') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry + ENVELOPE DEBEZIUM + WITH (SIZE = '3xsmall'); + ``` + + Because the three sources are pulling message schema data from the registry, materialize knows the column types to use for each attribute. + +1. Create a materialized view that has only the VIP users: + + ```sql + CREATE MATERIALIZED VIEW vip_users AS + SELECT + users.id, + users.name, + users.email, + users.role_id, + roles.name AS role_name + FROM users + JOIN roles ON users.role_id = roles.id + WHERE users.role_id = 4; + ``` + +1. Create a materialized view that has only the bad reviews: + + ```sql + CREATE MATERIALIZED VIEW bad_reviews AS + SELECT + reviews.user_id, + reviews.comment, + reviews.rating, + reviews.created_at, + reviews.updated_at + FROM reviews + WHERE reviews.rating < 4; + ``` + +1. Create a materialized view that filters all VIP users with bad reviews: + + ```sql + CREATE MATERIALIZED VIEW vip_users_with_bad_reviews AS + SELECT + vip_users.name, + vip_users.email, + vip_users.role_name, + bad_reviews.rating, + bad_reviews.comment + FROM vip_users + JOIN bad_reviews ON vip_users.id = bad_reviews.user_id; + ``` + +1. Query the materialized view: + + ```sql + SELECT * FROM vip_users_with_bad_reviews; + ``` + + Or use the `SUBSCRIBE` command to stream the results: + + ```sql + COPY (SUBSCRIBE vip_users_with_bad_reviews) TO STDOUT; + ``` + +## Cleanup + +To stop the services and remove the containers, run: + +```shell session +docker compose down +``` + +In Materialize, run: + +```sql +DROP CONNECTION redpanda_connection CASCADE; +DROP CONNECTION schema_registry CASCADE; +``` + +## Helpful resources: + +* [`CREATE CONNECTION`](https://materialize.com/docs/sql/create-connection/) +* [`MySQL CDC using Kafka and Debezium`](https://materialize.com/docs/ingest-data/cdc-mysql/) +* [`CREATE MATERIALIZED VIEW`](https://materialize.com/docs/sql/create-materialized-view) + +## Community + +If you have any questions or comments, please join the [Materialize Slack Community](https://materialize.com/s/chat)! diff --git a/integrations/debezium/mysql/connect/Dockerfile b/integrations/debezium/mysql/connect/Dockerfile new file mode 100644 index 0000000..e96f8b5 --- /dev/null +++ b/integrations/debezium/mysql/connect/Dockerfile @@ -0,0 +1,49 @@ +FROM debezium/connect-base:2.4 + +# +# Set up the plugins directory ... +# +ENV CONFLUENT_VERSION=7.0.1 \ + AVRO_VERSION=1.10.1 \ + GUAVA_VERSION=31.0.1-jre + +RUN docker-maven-download confluent kafka-connect-avro-converter "$CONFLUENT_VERSION" fd03a1436f29d39e1807e2fb6f8e415a && \ + docker-maven-download confluent kafka-connect-avro-data "$CONFLUENT_VERSION" d27f30e9eca4ef1129289c626e9ce1f1 && \ + docker-maven-download confluent kafka-avro-serializer "$CONFLUENT_VERSION" c72420603422ef54d61f493ca338187c && \ + docker-maven-download confluent kafka-schema-serializer "$CONFLUENT_VERSION" 9c510db58119ef66d692ae172d5b1204 && \ + docker-maven-download confluent kafka-schema-registry-client "$CONFLUENT_VERSION" 7449df1f5c9a51c3e82e776eb7814bf1 && \ + docker-maven-download confluent common-config "$CONFLUENT_VERSION" aab5670de446af5b6f10710e2eb86894 && \ + docker-maven-download confluent common-utils "$CONFLUENT_VERSION" 74bf5cc6de2748148f5770bccd83a37c && \ + docker-maven-download central org/apache/avro avro "$AVRO_VERSION" 35469fee6d74ecbadce4773bfe3a204c && \ + docker-maven-download central com/google/guava guava "$GUAVA_VERSION" bb811ca86cba6506cca5d415cd5559a7 + +# https://github.com/debezium/container-images/blob/main/connect/2.4/Dockerfile +LABEL maintainer="Debezium Community" + +ENV DEBEZIUM_VERSION="2.4.0.Final" \ + MAVEN_REPO_CENTRAL="" \ + MAVEN_REPOS_ADDITIONAL="" \ + MAVEN_DEP_DESTINATION=$KAFKA_CONNECT_PLUGINS_DIR \ + MONGODB_MD5=a22784387e0ec8a6abb1606c2c365cb2 \ + MYSQL_MD5=4bff262afc9678f5cbc3be6315b8e71e \ + POSTGRES_MD5=b42c9e208410f39ad1ad09778b1e3f03 \ + SQLSERVER_MD5=9b8bf3c62a7c22c465a32fa27b3cffb5 \ + ORACLE_MD5=21699814400860457dc2334b165882e6 \ + DB2_MD5=0727d7f2d1deeacef39e230acac835a8 \ + SPANNER_MD5=186b07595e914e9139941889fd675044 \ + VITESS_MD5=3b4d24c8c9898df060c408a13fd3429f \ + JDBC_MD5=77c5cb9adf932ab17c041544f4ade357 \ + KCRESTEXT_MD5=25c0353f5a7304b3c4780a20f0f5d0af \ + SCRIPTING_MD5=53a3661e7a9877744f4a30d6483d7957 + +RUN docker-maven-download debezium mongodb "$DEBEZIUM_VERSION" "$MONGODB_MD5" && \ + docker-maven-download debezium mysql "$DEBEZIUM_VERSION" "$MYSQL_MD5" && \ + docker-maven-download debezium postgres "$DEBEZIUM_VERSION" "$POSTGRES_MD5" && \ + docker-maven-download debezium sqlserver "$DEBEZIUM_VERSION" "$SQLSERVER_MD5" && \ + docker-maven-download debezium oracle "$DEBEZIUM_VERSION" "$ORACLE_MD5" && \ + docker-maven-download debezium-additional db2 db2 "$DEBEZIUM_VERSION" "$DB2_MD5" && \ + docker-maven-download debezium-additional jdbc jdbc "$DEBEZIUM_VERSION" "$JDBC_MD5" && \ + docker-maven-download debezium-additional spanner spanner "$DEBEZIUM_VERSION" "$SPANNER_MD5" && \ + docker-maven-download debezium-additional vitess vitess "$DEBEZIUM_VERSION" "$VITESS_MD5" && \ + docker-maven-download debezium-optional connect-rest-extension "$DEBEZIUM_VERSION" "$KCRESTEXT_MD5" && \ + docker-maven-download debezium-optional scripting "$DEBEZIUM_VERSION" "$SCRIPTING_MD5" diff --git a/integrations/debezium/mysql/deploy/Dockerfile b/integrations/debezium/mysql/deploy/Dockerfile new file mode 100644 index 0000000..737b101 --- /dev/null +++ b/integrations/debezium/mysql/deploy/Dockerfile @@ -0,0 +1,10 @@ +FROM ubuntu:latest + +RUN apt-get update && apt-get -qy install curl mysql-client + +COPY . /deploy + +COPY docker-entrypoint.sh /usr/local/bin +RUN chmod 777 /usr/local/bin/docker-entrypoint.sh + +ENTRYPOINT ["docker-entrypoint.sh"] diff --git a/integrations/debezium/mysql/deploy/docker-entrypoint.sh b/integrations/debezium/mysql/deploy/docker-entrypoint.sh new file mode 100644 index 0000000..c00a2ef --- /dev/null +++ b/integrations/debezium/mysql/deploy/docker-entrypoint.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +set -euo pipefail + +cd /deploy + +bash mysql_dbz.sh diff --git a/integrations/debezium/mysql/deploy/mysql_dbz.sh b/integrations/debezium/mysql/deploy/mysql_dbz.sh new file mode 100755 index 0000000..240f2cb --- /dev/null +++ b/integrations/debezium/mysql/deploy/mysql_dbz.sh @@ -0,0 +1,66 @@ +#!/bin/bash + +#Initialize Debezium (Kafka Connect Component) + +while true; do + echo "Waiting for Debezium to be ready" + sleep 0.1 + curl -s -o /dev/null -w "%{http_code}" http://debezium:8083/connectors/ | grep 200 + if [ $? -eq 0 ]; then + echo "Debezium is ready" + break + fi +done + +# Read the JSON file and register the connector and change the ${EXTERNAL_IP} with the external IP environment variable +sed -i "s/EXTERNAL_IP/${EXTERNAL_IP}/g" /deploy/register-mysql.json + +curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://debezium:8083/connectors/ -d @/deploy/register-mysql.json + +if [ $? -eq 0 ]; then + echo "Debezium connector registered" +else + echo "Debezium connector registration failed" + exit 1 +fi + +## +# Reviews generation mock script +# Table details: +# - name: reviews +# - columns: +# - id +# - user_id +# - review_text +# - review_rating +# - created_at +# - updated_at +## + +# Start generating reviews +echo "Generating reviews..." +id=1 +while [[ true ]] ; do + + # Define variables + user_role=$(seq 1 4 | sort -R | head -n1) + review_rating=$(seq 1 10 | sort -R | head -n1) + review_text="Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum." + + echo "Generating review for user_id: ${id}" + + # Generate users + # Assuming you have added name and role_id columns to your demo.users table + mysql -h mysql -u mysqluser -pmysqlpwd -e "INSERT INTO demo.users (id, name, email, role_id) VALUES ( ${id}, 'user${id}', 'user${id}@demo.com', ${user_role} );" 2> /dev/null + + # Generate reviews + # Assuming you have updated column names in your demo.reviews table + mysql -h mysql -u mysqluser -pmysqlpwd -e "INSERT INTO demo.reviews (user_id, comment, rating, created_at, updated_at) VALUES ( ${id}, '${review_text}', ${review_rating}, NOW(), NOW() );" 2> /dev/null + + # Increment id + ((id=id+1)) + + # Sleep for 1 second + sleep 1 + +done diff --git a/integrations/debezium/mysql/deploy/register-mysql.json b/integrations/debezium/mysql/deploy/register-mysql.json new file mode 100644 index 0000000..5619402 --- /dev/null +++ b/integrations/debezium/mysql/deploy/register-mysql.json @@ -0,0 +1,23 @@ +{ + "name": "mysql-connector", + "config": { + "connector.class": "io.debezium.connector.mysql.MySqlConnector", + "tasks.max": "1", + "database.hostname": "mysql", + "database.port": "3306", + "database.user": "debezium", + "database.password": "mysqlpwd", + "database.server.id":"223344", + "topic.prefix": "mysql_repl", + "database.include.list": "demo", + "database.history.kafka.topic":"mysql_repl.history", + "database.history.kafka.bootstrap.servers":"EXTERNAL_IP:9092", + "schema.history.internal.kafka.bootstrap.servers": "EXTERNAL_IP:9092", + "schema.history.internal.kafka.topic": "mysql_repl.internal.history", + "key.converter": "io.confluent.connect.avro.AvroConverter", + "value.converter": "io.confluent.connect.avro.AvroConverter", + "key.converter.schema.registry.url": "http://redpanda:8081", + "value.converter.schema.registry.url": "http://redpanda:8081", + "include.schema.changes": false + } + } diff --git a/integrations/debezium/mysql/docker-compose.yaml b/integrations/debezium/mysql/docker-compose.yaml new file mode 100644 index 0000000..dc0b07b --- /dev/null +++ b/integrations/debezium/mysql/docker-compose.yaml @@ -0,0 +1,60 @@ +services: + mysql: + image: mysql/mysql-server:8.0.27 + ports: + - 3306:3306 + environment: + - MYSQL_ROOT_PASSWORD=mysqlpwd + - MYSQL_USER=mysqluser + - MYSQL_PASSWORD=mysqlpwd + volumes: + - ./mysql/mysql.cnf:/etc/mysql/conf.d + - ./mysql/mysql_bootstrap.sql:/docker-entrypoint-initdb.d/mysql_bootstrap.sql + healthcheck: {test: mysql -p$$MYSQL_PASSWORD -e 'select 1', interval: 1s, start_period: 60s} + redpanda: + image: docker.vectorized.io/vectorized/redpanda:v21.11.2 + command: + - redpanda start + - --overprovisioned + - --smp 1 + - --memory 1G + - --reserve-memory 0M + - --node-id 0 + - --check=false + - --kafka-addr 0.0.0.0:9092 + - --advertise-kafka-addr ${EXTERNAL_IP:-redpanda}:9092 + - --pandaproxy-addr 0.0.0.0:8082 + - --advertise-pandaproxy-addr ${EXTERNAL_IP:-redpanda}:8082 + - --set redpanda.enable_transactions=true + - --set redpanda.enable_idempotence=true + ports: + - 9092:9092 + - 8081:8081 + - 8082:8082 + healthcheck: {test: curl -f localhost:9644/v1/status/ready, interval: 1s, start_period: 30s} + debezium: + build: ./connect + environment: + BOOTSTRAP_SERVERS: ${EXTERNAL_IP:-redpanda}:9092 + GROUP_ID: 1 + CONFIG_STORAGE_TOPIC: connect_configs + OFFSET_STORAGE_TOPIC: connect_offsets + KEY_CONVERTER: io.confluent.connect.avro.AvroConverter + VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter + CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://redpanda:8081 + CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://redpanda:8081 + volumes: + - ${PWD}/mysql:/data + ports: + - 8083:8083 + healthcheck: {test: curl -f localhost:8083, interval: 1s, start_period: 120s} + depends_on: + redpanda: {condition: service_healthy} + mysql: {condition: service_healthy} + debezium_deploy: + build: ./deploy + depends_on: + debezium: {condition: service_healthy} + environment: + - MYSQL_PASSWORD='mysqlpwd' + - EXTERNAL_IP=${EXTERNAL_IP:-redpanda} diff --git a/integrations/debezium/mysql/mysql/mysql.cnf b/integrations/debezium/mysql/mysql/mysql.cnf new file mode 100644 index 0000000..8d5bb9f --- /dev/null +++ b/integrations/debezium/mysql/mysql/mysql.cnf @@ -0,0 +1,5 @@ +[mysqld] +server-id = 223344 +log_bin = mysql-bin +expire_logs_days = 1 +binlog_format = row diff --git a/integrations/debezium/mysql/mysql/mysql_bootstrap.sql b/integrations/debezium/mysql/mysql/mysql_bootstrap.sql new file mode 100644 index 0000000..78e5c07 --- /dev/null +++ b/integrations/debezium/mysql/mysql/mysql_bootstrap.sql @@ -0,0 +1,37 @@ +CREATE DATABASE IF NOT EXISTS demo; +USE demo; + +GRANT ALL PRIVILEGES ON demo.* TO 'mysqluser'; + +CREATE USER 'debezium' IDENTIFIED WITH mysql_native_password BY 'mysqlpwd'; + +GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'; + +FLUSH PRIVILEGES; + +CREATE TABLE IF NOT EXISTS demo.users +( + id SERIAL PRIMARY KEY, + role_id INT NOT NULL, + name VARCHAR(255) NOT NULL, + email VARCHAR(255), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS demo.roles ( + id SERIAL PRIMARY KEY, + name VARCHAR(255) NOT NULL +); + +INSERT INTO demo.roles (name) VALUES ('admin'), ('user'), ('guest'), ('vip'); + +-- Create reviews table +CREATE TABLE IF NOT EXISTS demo.reviews ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL, + rating INT NOT NULL, + comment TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); diff --git a/integrations/debezium/postgres/README.md b/integrations/debezium/postgres/README.md new file mode 100644 index 0000000..70bf436 --- /dev/null +++ b/integrations/debezium/postgres/README.md @@ -0,0 +1,182 @@ +# Debezium + PostgreSQL + Materialize + +Before trying this out, you will need the following: + +- [Materialize account](https://materialize.com/register/). +- A publicly accessible Linux server with [Docker](https://docs.docker.com/get-docker/) and [Docker Compose](https://docs.docker.com/compose/install/) installed. + +## Running the demo + +If you want to try it right now, follow these steps: + +1. Clone the project on your Linux server and run: + + ```shell session + git clone https://github.com/MaterializeInc/demos.git + cd demos/integrations/debezium/postgres + ``` + +1. After cloning the project, you will need to set the `EXTERNAL_IP` environment variable to the IP address of your Linux server. For example: + + ```shell session + export EXTERNAL_IP=$(hostname -I | awk '{print $1}') + + # Check the value of EXTERNAL_IP + echo $EXTERNAL_IP + ``` + +1. Bring up the Docker Compose containers in the background. + + ```shell session + docker compose up -d --build + ``` + + **This may take one or two minutes to complete the first time you run it.** If all goes well, you'll have everything running in their own containers, with Debezium configured to ship changes from Postgres into Redpanda. + +1. Confirm that everything is running as expected: + + ```shell session + docker compose ps + ``` + +1. Exec in to the redpanda container to look around using redpanda's amazing [rpk](https://docs.redpanda.com/docs/reference/rpk/) CLI. + + ```shell session + docker compose exec redpanda /bin/bash + + rpk debug info + + rpk topic list + ``` +1. Connect to Materialize + +If you already have `psql` installed on your machine, use the provided connection string to connect: + +Example: + + ```shell session + psql "postgres://user%40domain.com@materialize_host:6875/materialize" + ``` + +Otherwise, you can find the steps to install and use your CLI of choice under [Supported tools](https://materialize.com/docs/integrations/sql-clients/#supported-tools). + +1. Now that you're in Materialize, define the connection to the Redpanda broker and the schema registry: + + ```sql + -- Create Redpanda connection + CREATE CONNECTION redpanda_connection + TO KAFKA ( + BROKER ''); + + -- Create Registry connection + CREATE CONNECTION schema_registry + TO CONFLUENT SCHEMA REGISTRY ( + URL 'http://'); + ``` + +1. Next, define all of the tables in `demo` as sources: + + ```sql + CREATE SOURCE users + FROM KAFKA CONNECTION redpanda_connection (TOPIC 'pg_repl.demo.users') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry + ENVELOPE DEBEZIUM + WITH (SIZE = '3xsmall'); + + CREATE SOURCE roles + FROM KAFKA CONNECTION redpanda_connection (TOPIC 'pg_repl.demo.roles') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry + ENVELOPE DEBEZIUM + WITH (SIZE = '3xsmall'); + + CREATE SOURCE reviews + FROM KAFKA CONNECTION redpanda_connection (TOPIC 'pg_repl.demo.reviews') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry + ENVELOPE DEBEZIUM + WITH (SIZE = '3xsmall'); + ``` + + Because the three sources are pulling message schema data from the registry, materialize knows the column types to use for each attribute. + +1. Create a materialized view that has only the VIP users: + + ```sql + CREATE MATERIALIZED VIEW vip_users AS + SELECT + users.id, + users.first_name, + users.last_name, + users.email, + users.role_id, + roles.name AS role_name + FROM users + JOIN roles ON users.role_id = roles.id + WHERE users.role_id = 4; + ``` + +1. Create a materialized view that has only the bad reviews: + + ```sql + CREATE MATERIALIZED VIEW bad_reviews AS + SELECT + reviews.user_id, + reviews.comment, + reviews.rating, + reviews.created_at, + reviews.updated_at + FROM reviews + WHERE reviews.rating < 4; + ``` + +1. Create a materialized view that filters all VIP users with bad reviews: + + ```sql + CREATE MATERIALIZED VIEW vip_users_with_bad_reviews AS + SELECT + vip_users.first_name, + vip_users.last_name, + vip_users.email, + vip_users.role_name, + bad_reviews.rating, + bad_reviews.comment + FROM vip_users + JOIN bad_reviews ON vip_users.id = bad_reviews.user_id; + ``` + +1. Query the materialized view: + + ```sql + SELECT * FROM vip_users_with_bad_reviews; + ``` + + Or use the `SUBSCRIBE` command to stream the results: + + ```sql + COPY (SUBSCRIBE vip_users_with_bad_reviews) TO STDOUT; + ``` + +## Cleanup + +To stop the services and remove the containers, run: + +```shell session +docker compose down +``` + +In Materialize, run: + +```sql +DROP CONNECTION redpanda_connection CASCADE; +DROP CONNECTION schema_registry CASCADE; +``` + +## Helpful resources: + +* [`CREATE SOURCE: PostgreSQL`](https://materialize.com/docs/sql/create-source/postgres) +* [`Postgres + Kafka + Debezium`](https://materialize.com/docs/integrations/cdc-postgres/#kafka--debezium) +* [`CREATE SOURCE`](https://materialize.com/docs/sql/create-source) +* [`CREATE MATERIALIZED VIEW`](https://materialize.com/docs/sql/create-materialized-view) + +## Community + +If you have any questions or comments, please join the [Materialize Slack Community](https://materialize.com/s/chat)! diff --git a/integrations/debezium/postgres/connect/Dockerfile b/integrations/debezium/postgres/connect/Dockerfile new file mode 100644 index 0000000..e96f8b5 --- /dev/null +++ b/integrations/debezium/postgres/connect/Dockerfile @@ -0,0 +1,49 @@ +FROM debezium/connect-base:2.4 + +# +# Set up the plugins directory ... +# +ENV CONFLUENT_VERSION=7.0.1 \ + AVRO_VERSION=1.10.1 \ + GUAVA_VERSION=31.0.1-jre + +RUN docker-maven-download confluent kafka-connect-avro-converter "$CONFLUENT_VERSION" fd03a1436f29d39e1807e2fb6f8e415a && \ + docker-maven-download confluent kafka-connect-avro-data "$CONFLUENT_VERSION" d27f30e9eca4ef1129289c626e9ce1f1 && \ + docker-maven-download confluent kafka-avro-serializer "$CONFLUENT_VERSION" c72420603422ef54d61f493ca338187c && \ + docker-maven-download confluent kafka-schema-serializer "$CONFLUENT_VERSION" 9c510db58119ef66d692ae172d5b1204 && \ + docker-maven-download confluent kafka-schema-registry-client "$CONFLUENT_VERSION" 7449df1f5c9a51c3e82e776eb7814bf1 && \ + docker-maven-download confluent common-config "$CONFLUENT_VERSION" aab5670de446af5b6f10710e2eb86894 && \ + docker-maven-download confluent common-utils "$CONFLUENT_VERSION" 74bf5cc6de2748148f5770bccd83a37c && \ + docker-maven-download central org/apache/avro avro "$AVRO_VERSION" 35469fee6d74ecbadce4773bfe3a204c && \ + docker-maven-download central com/google/guava guava "$GUAVA_VERSION" bb811ca86cba6506cca5d415cd5559a7 + +# https://github.com/debezium/container-images/blob/main/connect/2.4/Dockerfile +LABEL maintainer="Debezium Community" + +ENV DEBEZIUM_VERSION="2.4.0.Final" \ + MAVEN_REPO_CENTRAL="" \ + MAVEN_REPOS_ADDITIONAL="" \ + MAVEN_DEP_DESTINATION=$KAFKA_CONNECT_PLUGINS_DIR \ + MONGODB_MD5=a22784387e0ec8a6abb1606c2c365cb2 \ + MYSQL_MD5=4bff262afc9678f5cbc3be6315b8e71e \ + POSTGRES_MD5=b42c9e208410f39ad1ad09778b1e3f03 \ + SQLSERVER_MD5=9b8bf3c62a7c22c465a32fa27b3cffb5 \ + ORACLE_MD5=21699814400860457dc2334b165882e6 \ + DB2_MD5=0727d7f2d1deeacef39e230acac835a8 \ + SPANNER_MD5=186b07595e914e9139941889fd675044 \ + VITESS_MD5=3b4d24c8c9898df060c408a13fd3429f \ + JDBC_MD5=77c5cb9adf932ab17c041544f4ade357 \ + KCRESTEXT_MD5=25c0353f5a7304b3c4780a20f0f5d0af \ + SCRIPTING_MD5=53a3661e7a9877744f4a30d6483d7957 + +RUN docker-maven-download debezium mongodb "$DEBEZIUM_VERSION" "$MONGODB_MD5" && \ + docker-maven-download debezium mysql "$DEBEZIUM_VERSION" "$MYSQL_MD5" && \ + docker-maven-download debezium postgres "$DEBEZIUM_VERSION" "$POSTGRES_MD5" && \ + docker-maven-download debezium sqlserver "$DEBEZIUM_VERSION" "$SQLSERVER_MD5" && \ + docker-maven-download debezium oracle "$DEBEZIUM_VERSION" "$ORACLE_MD5" && \ + docker-maven-download debezium-additional db2 db2 "$DEBEZIUM_VERSION" "$DB2_MD5" && \ + docker-maven-download debezium-additional jdbc jdbc "$DEBEZIUM_VERSION" "$JDBC_MD5" && \ + docker-maven-download debezium-additional spanner spanner "$DEBEZIUM_VERSION" "$SPANNER_MD5" && \ + docker-maven-download debezium-additional vitess vitess "$DEBEZIUM_VERSION" "$VITESS_MD5" && \ + docker-maven-download debezium-optional connect-rest-extension "$DEBEZIUM_VERSION" "$KCRESTEXT_MD5" && \ + docker-maven-download debezium-optional scripting "$DEBEZIUM_VERSION" "$SCRIPTING_MD5" diff --git a/integrations/debezium/postgres/deploy/Dockerfile b/integrations/debezium/postgres/deploy/Dockerfile new file mode 100644 index 0000000..4c0db5a --- /dev/null +++ b/integrations/debezium/postgres/deploy/Dockerfile @@ -0,0 +1,10 @@ +FROM ubuntu:latest + +RUN apt-get update && apt-get -qy install curl + +COPY . /deploy + +COPY docker-entrypoint.sh /usr/local/bin +RUN chmod 777 /usr/local/bin/docker-entrypoint.sh + +ENTRYPOINT ["docker-entrypoint.sh"] diff --git a/integrations/debezium/postgres/deploy/docker-entrypoint.sh b/integrations/debezium/postgres/deploy/docker-entrypoint.sh new file mode 100644 index 0000000..c06c55f --- /dev/null +++ b/integrations/debezium/postgres/deploy/docker-entrypoint.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +set -euo pipefail + +cd /deploy + +bash psql_dbz.sh diff --git a/integrations/debezium/postgres/deploy/psql_dbz.sh b/integrations/debezium/postgres/deploy/psql_dbz.sh new file mode 100755 index 0000000..6407cea --- /dev/null +++ b/integrations/debezium/postgres/deploy/psql_dbz.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +#Initialize Debezium (Kafka Connect Component) + +while true; do + echo "Waiting for Debezium to be ready" + sleep 0.1 + curl -s -o /dev/null -w "%{http_code}" http://debezium:8083/connectors/ | grep 200 + if [ $? -eq 0 ]; then + echo "Debezium is ready" + break + fi +done + +# Read the JSON file and register the connector and change the ${EXTERNAL_IP} with the external IP environment variable +sed -i "s/EXTERNAL_IP/${EXTERNAL_IP}/g" /deploy/register-postgres.json + +curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://debezium:8083/connectors/ -d @/deploy/register-postgres.json + +if [ $? -eq 0 ]; then + echo "Debezium connector registered" +else + echo "Debezium connector registration failed" + exit 1 +fi diff --git a/integrations/debezium/postgres/deploy/register-postgres.json b/integrations/debezium/postgres/deploy/register-postgres.json new file mode 100644 index 0000000..e359f86 --- /dev/null +++ b/integrations/debezium/postgres/deploy/register-postgres.json @@ -0,0 +1,21 @@ +{ + "name": "postgres-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + "plugin.name":"pgoutput", + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "postgres", + "database.password": "postgres", + "database.dbname" : "postgres", + "topic.prefix": "pg_repl", + "schema.include.list": "demo", + "publication.autocreate.mode":"filtered", + "key.converter": "io.confluent.connect.avro.AvroConverter", + "value.converter": "io.confluent.connect.avro.AvroConverter", + "key.converter.schema.registry.url": "http://EXTERNAL_IP:8081", + "value.converter.schema.registry.url": "http://EXTERNAL_IP:8081", + "value.converter.schemas.enable": false + } +} diff --git a/integrations/debezium/postgres/docker-compose.yaml b/integrations/debezium/postgres/docker-compose.yaml new file mode 100644 index 0000000..61acd6f --- /dev/null +++ b/integrations/debezium/postgres/docker-compose.yaml @@ -0,0 +1,97 @@ +version: '3.9' +services: + redpanda: + image: docker.vectorized.io/vectorized/redpanda:v21.10.1 + container_name: redpanda + command: + - redpanda start + - --overprovisioned + - --smp 1 + - --memory 1G + - --reserve-memory 0M + - --node-id 0 + - --check=false + - --kafka-addr 0.0.0.0:9092 + - --advertise-kafka-addr ${EXTERNAL_IP:-redpanda}:9092 + - --pandaproxy-addr 0.0.0.0:8082 + - --advertise-pandaproxy-addr ${EXTERNAL_IP:-redpanda}:9092 + - --set redpanda.enable_transactions=true + - --set redpanda.enable_idempotence=true + ports: + - 9092:9092 + - 8081:8081 + - 8082:8082 + healthcheck: {test: curl -f localhost:9644/v1/status/ready, interval: 1s, start_period: 30s} + postgres: + build: ./postgres + container_name: postgres + ports: + - 5432:5432 + environment: + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + - POSTGRES_DB=postgres + volumes: + - ${PWD}/postgres/postgres_bootstrap.sql:/docker-entrypoint-initdb.d/postgres_bootstrap.sql + depends_on: + redpanda: {condition: service_healthy} + healthcheck: + test: pg_isready -U postgres + interval: 1s + start_period: 60s + debezium: + #image: debezium/connect:2.4 + build: ./connect + container_name: debezium + environment: + BOOTSTRAP_SERVERS: ${EXTERNAL_IP:-redpanda}:9092 + GROUP_ID: 1 + CONFIG_STORAGE_TOPIC: connect_configs + OFFSET_STORAGE_TOPIC: connect_offsets + STATUS_STORAGE_TOPIC: connect_status + CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://${EXTERNAL_IP:-redpanda}:8081 + CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://${EXTERNAL_IP:-redpanda}:8081 + ports: + - 8083:8083 + healthcheck: {test: curl -f localhost:8083, interval: 1s, start_period: 120s} + depends_on: + redpanda: {condition: service_healthy} + postgres: {condition: service_healthy} + debezium_deploy: + build: ./deploy + depends_on: + redpanda: {condition: service_healthy} + postgres: {condition: service_healthy} + debezium: {condition: service_healthy} + environment: + - EXTERNAL_IP=${EXTERNAL_IP:-redpanda} + datagen1: + image: materialize/datagen:latest + container_name: datagen1 + depends_on: + postgres: {condition: service_healthy} + environment: + POSTGRES_HOST: "postgres" + POSTGRES_PORT: 5432 + POSTGRES_DB: postgres + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + volumes: + - ./schemas:/schemas + entrypoint: + datagen -s /schemas/users.sql -f postgres -n 1000 + datagen2: + image: materialize/datagen:latest + container_name: datagen2 + depends_on: + postgres: {condition: service_healthy} + environment: + POSTGRES_HOST: "postgres" + POSTGRES_PORT: 5432 + POSTGRES_DB: postgres + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + volumes: + - ./schemas:/schemas + entrypoint: + datagen -s /schemas/reviews.sql -f postgres -n 100000 -w 1000 diff --git a/integrations/debezium/postgres/postgres/Dockerfile b/integrations/debezium/postgres/postgres/Dockerfile new file mode 100644 index 0000000..db1f985 --- /dev/null +++ b/integrations/debezium/postgres/postgres/Dockerfile @@ -0,0 +1,3 @@ +FROM postgres:11-alpine + +COPY postgres_bootstrap.sql /docker-entrypoint-initdb.d/ diff --git a/integrations/debezium/postgres/postgres/postgres_bootstrap.sql b/integrations/debezium/postgres/postgres/postgres_bootstrap.sql new file mode 100644 index 0000000..3922708 --- /dev/null +++ b/integrations/debezium/postgres/postgres/postgres_bootstrap.sql @@ -0,0 +1,39 @@ +-- Create a schema and set the search path +CREATE SCHEMA demo; +SET search_path TO demo; + +-- Set the wal_level to logical and andd replication role to the postgres user +ALTER SYSTEM SET wal_level = logical; +ALTER ROLE postgres WITH REPLICATION; + +-- Create the table +CREATE TABLE IF NOT EXISTS users ( + id SERIAL PRIMARY KEY, + role_id INT NOT NULL, + first_name VARCHAR(255) NOT NULL, + last_name VARCHAR(255) NOT NULL, + email VARCHAR(255) NOT NULL +); + +CREATE TABLE IF NOT EXISTS roles ( + id SERIAL PRIMARY KEY, + name VARCHAR(255) NOT NULL +); + +INSERT INTO roles (name) VALUES ('admin'), ('user'), ('guest'), ('vip'); + +-- Create reviews table +CREATE TABLE IF NOT EXISTS reviews ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL, + rating INT NOT NULL, + comment TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +ALTER TABLE users REPLICA IDENTITY FULL; +ALTER TABLE roles REPLICA IDENTITY FULL; +ALTER TABLE reviews REPLICA IDENTITY FULL; + +CREATE PUBLICATION mz_source FOR TABLE users, roles, reviews; diff --git a/integrations/debezium/postgres/schemas/reviews.sql b/integrations/debezium/postgres/schemas/reviews.sql new file mode 100644 index 0000000..bc83272 --- /dev/null +++ b/integrations/debezium/postgres/schemas/reviews.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS demo.reviews ( + id SERIAL PRIMARY KEY, + user_id INT COMMENT 'faker.datatype.number({min: 1, max: 1000})', + rating INT COMMENT 'faker.datatype.number({min: 1, max: 10})', + comment TEXT COMMENT 'faker.lorem.paragraph()', + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); diff --git a/integrations/debezium/postgres/schemas/users.sql b/integrations/debezium/postgres/schemas/users.sql new file mode 100644 index 0000000..9d61e42 --- /dev/null +++ b/integrations/debezium/postgres/schemas/users.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS demo.users ( + "id" INT COMMENT 'iteration.index', + "role_id" INT COMMENT 'faker.datatype.number({min: 1, max: 4})', + "first_name" varchar COMMENT 'faker.internet.userName()', + "last_name" VARCHAR COMMENT 'faker.internet.userName()', + "email" VARCHAR COMMENT 'faker.internet.email()' +); diff --git a/integrations/debezium/sqlserver/README.md b/integrations/debezium/sqlserver/README.md new file mode 100644 index 0000000..7a12638 --- /dev/null +++ b/integrations/debezium/sqlserver/README.md @@ -0,0 +1,139 @@ +# Debezium + SQL Server + Materialize + +Before trying this out, you will need the following: + +- [Materialize account](https://materialize.com/register/). +- A publicly accessible Linux server with [Docker](https://docs.docker.com/get-docker/) and [Docker Compose](https://docs.docker.com/compose/install/) installed. + +## Running the demo + +If you want to try it right now, follow these steps: + +1. Clone the project on your Linux server and run: + + ```shell session + git clone https://github.com/MaterializeInc/demos.git + cd demos/integrations/debezium/sqlserver + ``` + +1. Start all containers: + + ```shell + export EXTERNAL_IP=$(hostname -I | awk '{print $1}') + docker compose up -d --build + ``` + +1. Check the status of the containers: + + ```shell + docker compose ps + ``` + + +1. Exec in to the redpanda container to look around using redpanda's amazing [rpk](https://docs.redpanda.com/docs/reference/rpk/) CLI. + + ```shell session + docker compose exec redpanda /bin/bash + + rpk debug info + + rpk topic list + ``` + +1. Connect to Materialize + + If you already have `psql` installed on your machine, use the provided connection string to connect: + + Example: + + ```shell session + psql "postgres://user%40domain.com@materialize_host:6875/materialize" + ``` + + Otherwise, you can find the steps to install and use your CLI of choice under [Supported tools](https://materialize.com/docs/integrations/sql-clients/#supported-tools). + +1. Now that you're in Materialize, define the connection to the Redpanda broker and the schema registry: + + ```sql + -- Create Redpanda connection + CREATE CONNECTION redpanda_connection + TO KAFKA ( + BROKER ''); + + -- Create Registry connection + CREATE CONNECTION schema_registry + TO CONFLUENT SCHEMA REGISTRY ( + URL 'http://'); + ``` + +1. Next, define all of the topics as sources: + + ```sql + CREATE SOURCE customers + FROM KAFKA CONNECTION redpanda_connection (TOPIC 'server1.testDB.dbo.customers') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry + ENVELOPE DEBEZIUM + WITH (SIZE = '3xsmall'); + + CREATE SOURCE orders + FROM KAFKA CONNECTION redpanda_connection (TOPIC 'server1.testDB.dbo.orders') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry + ENVELOPE DEBEZIUM + WITH (SIZE = '3xsmall'); + + CREATE SOURCE products + FROM KAFKA CONNECTION redpanda_connection (TOPIC 'server1.testDB.dbo.products') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry + ENVELOPE DEBEZIUM + WITH (SIZE = '3xsmall'); + + CREATE SOURCE products_on_hand + FROM KAFKA CONNECTION redpanda_connection (TOPIC 'server1.testDB.dbo.products_on_hand') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry + ENVELOPE DEBEZIUM + WITH (SIZE = '3xsmall'); + ``` + +1. Subscribe to the `orders` topic: + + ```sql + COPY ( SUBSCRIBE TO orders ) TO STDOUT; + ``` + +1. Next generate some orders: + + ```sh + cat sqlserver/orders.sql | docker compose -f docker compose.yaml exec -T sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD' + ``` + +1. Modify records in the database via SQL Server client (do not forget to add `GO` command to execute the statement) + + ```sh + docker compose -f docker compose.yaml exec sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD -d testDB' + ``` + + +## Cleanup + +To stop the services and remove the containers, run: + +```shell session +docker compose down +``` + +In Materialize, run: + +```sql +DROP CONNECTION redpanda_connection CASCADE; +DROP CONNECTION schema_registry CASCADE; +``` + +## Helpful resources: + +* [`CREATE CONNECTION`](https://materialize.com/docs/sql/create-connection/) +* [`CREATE SOURCE`](https://materialize.com/docs/sql/create-source) +* [`CREATE MATERIALIZED VIEW`](https://materialize.com/docs/sql/create-materialized-view) + +## Community + +If you have any questions or comments, please join the [Materialize Slack Community](https://materialize.com/s/chat)! diff --git a/integrations/debezium/sqlserver/connect/Dockerfile b/integrations/debezium/sqlserver/connect/Dockerfile new file mode 100644 index 0000000..e96f8b5 --- /dev/null +++ b/integrations/debezium/sqlserver/connect/Dockerfile @@ -0,0 +1,49 @@ +FROM debezium/connect-base:2.4 + +# +# Set up the plugins directory ... +# +ENV CONFLUENT_VERSION=7.0.1 \ + AVRO_VERSION=1.10.1 \ + GUAVA_VERSION=31.0.1-jre + +RUN docker-maven-download confluent kafka-connect-avro-converter "$CONFLUENT_VERSION" fd03a1436f29d39e1807e2fb6f8e415a && \ + docker-maven-download confluent kafka-connect-avro-data "$CONFLUENT_VERSION" d27f30e9eca4ef1129289c626e9ce1f1 && \ + docker-maven-download confluent kafka-avro-serializer "$CONFLUENT_VERSION" c72420603422ef54d61f493ca338187c && \ + docker-maven-download confluent kafka-schema-serializer "$CONFLUENT_VERSION" 9c510db58119ef66d692ae172d5b1204 && \ + docker-maven-download confluent kafka-schema-registry-client "$CONFLUENT_VERSION" 7449df1f5c9a51c3e82e776eb7814bf1 && \ + docker-maven-download confluent common-config "$CONFLUENT_VERSION" aab5670de446af5b6f10710e2eb86894 && \ + docker-maven-download confluent common-utils "$CONFLUENT_VERSION" 74bf5cc6de2748148f5770bccd83a37c && \ + docker-maven-download central org/apache/avro avro "$AVRO_VERSION" 35469fee6d74ecbadce4773bfe3a204c && \ + docker-maven-download central com/google/guava guava "$GUAVA_VERSION" bb811ca86cba6506cca5d415cd5559a7 + +# https://github.com/debezium/container-images/blob/main/connect/2.4/Dockerfile +LABEL maintainer="Debezium Community" + +ENV DEBEZIUM_VERSION="2.4.0.Final" \ + MAVEN_REPO_CENTRAL="" \ + MAVEN_REPOS_ADDITIONAL="" \ + MAVEN_DEP_DESTINATION=$KAFKA_CONNECT_PLUGINS_DIR \ + MONGODB_MD5=a22784387e0ec8a6abb1606c2c365cb2 \ + MYSQL_MD5=4bff262afc9678f5cbc3be6315b8e71e \ + POSTGRES_MD5=b42c9e208410f39ad1ad09778b1e3f03 \ + SQLSERVER_MD5=9b8bf3c62a7c22c465a32fa27b3cffb5 \ + ORACLE_MD5=21699814400860457dc2334b165882e6 \ + DB2_MD5=0727d7f2d1deeacef39e230acac835a8 \ + SPANNER_MD5=186b07595e914e9139941889fd675044 \ + VITESS_MD5=3b4d24c8c9898df060c408a13fd3429f \ + JDBC_MD5=77c5cb9adf932ab17c041544f4ade357 \ + KCRESTEXT_MD5=25c0353f5a7304b3c4780a20f0f5d0af \ + SCRIPTING_MD5=53a3661e7a9877744f4a30d6483d7957 + +RUN docker-maven-download debezium mongodb "$DEBEZIUM_VERSION" "$MONGODB_MD5" && \ + docker-maven-download debezium mysql "$DEBEZIUM_VERSION" "$MYSQL_MD5" && \ + docker-maven-download debezium postgres "$DEBEZIUM_VERSION" "$POSTGRES_MD5" && \ + docker-maven-download debezium sqlserver "$DEBEZIUM_VERSION" "$SQLSERVER_MD5" && \ + docker-maven-download debezium oracle "$DEBEZIUM_VERSION" "$ORACLE_MD5" && \ + docker-maven-download debezium-additional db2 db2 "$DEBEZIUM_VERSION" "$DB2_MD5" && \ + docker-maven-download debezium-additional jdbc jdbc "$DEBEZIUM_VERSION" "$JDBC_MD5" && \ + docker-maven-download debezium-additional spanner spanner "$DEBEZIUM_VERSION" "$SPANNER_MD5" && \ + docker-maven-download debezium-additional vitess vitess "$DEBEZIUM_VERSION" "$VITESS_MD5" && \ + docker-maven-download debezium-optional connect-rest-extension "$DEBEZIUM_VERSION" "$KCRESTEXT_MD5" && \ + docker-maven-download debezium-optional scripting "$DEBEZIUM_VERSION" "$SCRIPTING_MD5" diff --git a/integrations/debezium/sqlserver/deploy/Dockerfile b/integrations/debezium/sqlserver/deploy/Dockerfile new file mode 100644 index 0000000..4c0db5a --- /dev/null +++ b/integrations/debezium/sqlserver/deploy/Dockerfile @@ -0,0 +1,10 @@ +FROM ubuntu:latest + +RUN apt-get update && apt-get -qy install curl + +COPY . /deploy + +COPY docker-entrypoint.sh /usr/local/bin +RUN chmod 777 /usr/local/bin/docker-entrypoint.sh + +ENTRYPOINT ["docker-entrypoint.sh"] diff --git a/integrations/debezium/sqlserver/deploy/docker-entrypoint.sh b/integrations/debezium/sqlserver/deploy/docker-entrypoint.sh new file mode 100644 index 0000000..6744ba9 --- /dev/null +++ b/integrations/debezium/sqlserver/deploy/docker-entrypoint.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +set -euo pipefail + +cd /deploy + +bash sqlserver_dbz.sh diff --git a/integrations/debezium/sqlserver/deploy/register-sqlserver.json b/integrations/debezium/sqlserver/deploy/register-sqlserver.json new file mode 100644 index 0000000..6b8cae7 --- /dev/null +++ b/integrations/debezium/sqlserver/deploy/register-sqlserver.json @@ -0,0 +1,20 @@ +{ + "name": "inventory-connector", + "config": { + "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector", + "tasks.max" : "1", + "topic.prefix" : "server1", + "database.hostname" : "sqlserver", + "database.port" : "1433", + "database.user" : "sa", + "database.password" : "Password!", + "database.names" : "testDB", + "schema.history.internal.kafka.bootstrap.servers" : "EXTERNAL_IP:9092", + "schema.history.internal.kafka.topic": "schema-changes.inventory", + "key.converter": "io.confluent.connect.avro.AvroConverter", + "value.converter": "io.confluent.connect.avro.AvroConverter", + "key.converter.schema.registry.url": "http://redpanda:8081", + "value.converter.schema.registry.url": "http://redpanda:8081", + "database.encrypt": "false" + } +} diff --git a/integrations/debezium/sqlserver/deploy/sqlserver_dbz.sh b/integrations/debezium/sqlserver/deploy/sqlserver_dbz.sh new file mode 100755 index 0000000..45f2e62 --- /dev/null +++ b/integrations/debezium/sqlserver/deploy/sqlserver_dbz.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +#Initialize Debezium (Kafka Connect Component) + +while true; do + echo "Waiting for Debezium to be ready" + sleep 0.1 + curl -s -o /dev/null -w "%{http_code}" http://debezium:8083/connectors/ | grep 200 + if [ $? -eq 0 ]; then + echo "Debezium is ready" + break + fi +done + +# Read the JSON file and register the connector and change the ${EXTERNAL_IP} with the external IP environment variable +sed -i "s/EXTERNAL_IP/${EXTERNAL_IP}/g" /deploy/register-sqlserver.json + +curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://debezium:8083/connectors/ -d @/deploy/register-sqlserver.json + +if [ $? -eq 0 ]; then + echo "Debezium connector registered" +else + echo "Debezium connector registration failed" + exit 1 +fi diff --git a/integrations/debezium/sqlserver/docker-compose.yaml b/integrations/debezium/sqlserver/docker-compose.yaml new file mode 100644 index 0000000..df9285d --- /dev/null +++ b/integrations/debezium/sqlserver/docker-compose.yaml @@ -0,0 +1,63 @@ +version: '2' +services: + redpanda: + image: docker.vectorized.io/vectorized/redpanda:v21.10.1 + container_name: redpanda + command: + - redpanda start + - --overprovisioned + - --smp 1 + - --memory 1G + - --reserve-memory 0M + - --node-id 0 + - --check=false + - --kafka-addr 0.0.0.0:9092 + - --advertise-kafka-addr ${EXTERNAL_IP:-redpanda}:9092 + - --pandaproxy-addr 0.0.0.0:8082 + - --advertise-pandaproxy-addr ${EXTERNAL_IP:-redpanda}:9092 + - --set redpanda.enable_transactions=true + - --set redpanda.enable_idempotence=true + ports: + - 9092:9092 + - 8081:8081 + - 8082:8082 + healthcheck: {test: curl -f localhost:9644/v1/status/ready, interval: 1s, start_period: 30s} + sqlserver: + # image: mcr.microsoft.com/mssql/server:2019-latest + build: ./sqlserver + ports: + - 1433:1433 + environment: + - ACCEPT_EULA=Y + - MSSQL_PID=Standard + - SA_PASSWORD=Password! + - MSSQL_AGENT_ENABLED=true + healthcheck: + test: /opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P "$$SA_PASSWORD" -Q "SELECT 1" || exit 1 + interval: 10s + timeout: 3s + retries: 10 + start_period: 10s + debezium: + build: ./connect + container_name: debezium + ports: + - 8083:8083 + environment: + - BOOTSTRAP_SERVERS=${EXTERNAL_IP:-redpanda}:9092 + - GROUP_ID=1 + - CONFIG_STORAGE_TOPIC=my_connect_configs + - OFFSET_STORAGE_TOPIC=my_connect_offsets + - STATUS_STORAGE_TOPIC=my_connect_statuses + healthcheck: {test: curl -f localhost:8083, interval: 1s, start_period: 120s} + depends_on: + redpanda: {condition: service_healthy} + sqlserver: {condition: service_healthy} + debezium_deploy: + build: ./deploy + depends_on: + redpanda: {condition: service_healthy} + sqlserver: {condition: service_healthy} + debezium: {condition: service_healthy} + environment: + - EXTERNAL_IP=${EXTERNAL_IP:-redpanda} diff --git a/integrations/debezium/sqlserver/sqlserver/Dockerfile b/integrations/debezium/sqlserver/sqlserver/Dockerfile new file mode 100644 index 0000000..4a93e06 --- /dev/null +++ b/integrations/debezium/sqlserver/sqlserver/Dockerfile @@ -0,0 +1,10 @@ +FROM mcr.microsoft.com/mssql/server:2019-latest + +# Copy your SQL script into the image +COPY ./inventory.sql / + +# Create an entrypoint script +COPY ./entrypoint.sh / + +# Set the entrypoint script to run on container startup +ENTRYPOINT ["/entrypoint.sh"] diff --git a/integrations/debezium/sqlserver/sqlserver/entrypoint.sh b/integrations/debezium/sqlserver/sqlserver/entrypoint.sh new file mode 100755 index 0000000..3ec4605 --- /dev/null +++ b/integrations/debezium/sqlserver/sqlserver/entrypoint.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +# Start SQL Server +/opt/mssql/bin/sqlservr & + +# Wait for SQL Server to start up +echo "Waiting for SQL Server to start up..." +sleep 25 + +# Run the SQL script +echo "Running the SQL script..." +/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD -i /inventory.sql + +# Keep the container running +tail -f /dev/null diff --git a/integrations/debezium/sqlserver/sqlserver/inventory.sql b/integrations/debezium/sqlserver/sqlserver/inventory.sql new file mode 100644 index 0000000..ae5839d --- /dev/null +++ b/integrations/debezium/sqlserver/sqlserver/inventory.sql @@ -0,0 +1,84 @@ +-- Create the test database +CREATE DATABASE testDB; +GO +USE testDB; +EXEC sys.sp_cdc_enable_db; + +-- Create and populate our products using a single insert with many rows +CREATE TABLE products ( + id INTEGER IDENTITY(101,1) NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL, + description VARCHAR(512), + weight FLOAT +); +INSERT INTO products(name,description,weight) + VALUES ('scooter','Small 2-wheel scooter',3.14); +INSERT INTO products(name,description,weight) + VALUES ('car battery','12V car battery',8.1); +INSERT INTO products(name,description,weight) + VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8); +INSERT INTO products(name,description,weight) + VALUES ('hammer','12oz carpenter''s hammer',0.75); +INSERT INTO products(name,description,weight) + VALUES ('hammer','14oz carpenter''s hammer',0.875); +INSERT INTO products(name,description,weight) + VALUES ('hammer','16oz carpenter''s hammer',1.0); +INSERT INTO products(name,description,weight) + VALUES ('rocks','box of assorted rocks',5.3); +INSERT INTO products(name,description,weight) + VALUES ('jacket','water resistent black wind breaker',0.1); +INSERT INTO products(name,description,weight) + VALUES ('spare tire','24 inch spare tire',22.2); +EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products', @role_name = NULL, @supports_net_changes = 0; +-- Create and populate the products on hand using multiple inserts +CREATE TABLE products_on_hand ( + product_id INTEGER NOT NULL PRIMARY KEY, + quantity INTEGER NOT NULL, + FOREIGN KEY (product_id) REFERENCES products(id) +); +INSERT INTO products_on_hand VALUES (101,3); +INSERT INTO products_on_hand VALUES (102,8); +INSERT INTO products_on_hand VALUES (103,18); +INSERT INTO products_on_hand VALUES (104,4); +INSERT INTO products_on_hand VALUES (105,5); +INSERT INTO products_on_hand VALUES (106,0); +INSERT INTO products_on_hand VALUES (107,44); +INSERT INTO products_on_hand VALUES (108,2); +INSERT INTO products_on_hand VALUES (109,5); +EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products_on_hand', @role_name = NULL, @supports_net_changes = 0; +-- Create some customers ... +CREATE TABLE customers ( + id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY, + first_name VARCHAR(255) NOT NULL, + last_name VARCHAR(255) NOT NULL, + email VARCHAR(255) NOT NULL UNIQUE +); +INSERT INTO customers(first_name,last_name,email) + VALUES ('Sally','Thomas','sally.thomas@acme.com'); +INSERT INTO customers(first_name,last_name,email) + VALUES ('George','Bailey','gbailey@foobar.com'); +INSERT INTO customers(first_name,last_name,email) + VALUES ('Edward','Walker','ed@walker.com'); +INSERT INTO customers(first_name,last_name,email) + VALUES ('Anne','Kretchmar','annek@noanswer.org'); +EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0; +-- Create some very simple orders +CREATE TABLE orders ( + id INTEGER IDENTITY(10001,1) NOT NULL PRIMARY KEY, + order_date DATE NOT NULL, + purchaser INTEGER NOT NULL, + quantity INTEGER NOT NULL, + product_id INTEGER NOT NULL, + FOREIGN KEY (purchaser) REFERENCES customers(id), + FOREIGN KEY (product_id) REFERENCES products(id) +); +INSERT INTO orders(order_date,purchaser,quantity,product_id) + VALUES ('16-JAN-2016', 1001, 1, 102); +INSERT INTO orders(order_date,purchaser,quantity,product_id) + VALUES ('17-JAN-2016', 1002, 2, 105); +INSERT INTO orders(order_date,purchaser,quantity,product_id) + VALUES ('19-FEB-2016', 1002, 2, 106); +INSERT INTO orders(order_date,purchaser,quantity,product_id) + VALUES ('21-FEB-2016', 1003, 1, 107); +EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'orders', @role_name = NULL, @supports_net_changes = 0; +GO diff --git a/integrations/debezium/sqlserver/sqlserver/orders.sql b/integrations/debezium/sqlserver/sqlserver/orders.sql new file mode 100644 index 0000000..8cca657 --- /dev/null +++ b/integrations/debezium/sqlserver/sqlserver/orders.sql @@ -0,0 +1,19 @@ +GO +USE testDB; +EXEC sys.sp_cdc_enable_db; + +INSERT INTO orders(order_date,purchaser,quantity,product_id) + VALUES ('16-JAN-2016', 1001, 1, 102); +INSERT INTO orders(order_date,purchaser,quantity,product_id) + VALUES ('17-JAN-2016', 1002, 2, 105); +INSERT INTO orders(order_date,purchaser,quantity,product_id) + VALUES ('19-FEB-2016', 1002, 2, 106); +INSERT INTO orders(order_date,purchaser,quantity,product_id) + VALUES ('21-FEB-2016', 1003, 1, 107); + +-- Update 4 rows in the orders table +UPDATE orders SET quantity = 5 WHERE id IN (SELECT TOP 4 id FROM orders ORDER BY id); + +-- Delete 4 rows from the orders table +DELETE FROM orders WHERE id IN (SELECT TOP 4 id FROM orders ORDER BY id); +GO