Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Debezium CDC examples for Postgres & SQL Server & MySQL #93

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions integrations/debezium/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Materialize + Debezium Examples

## Overview

This is a collection of demos that show how to use the [Debezium](https://materialize.com/docs/ingest-data/debezium/) connector with Materialize.

## Demos

| Demo | Description | Materialize Docs |
| ---------------------------------- | --------------------------------------------------------------------------- | ------------------------------------------------------------------------------------ |
| [Postgres](postgres) | Connect to a Postgres database and stream changes to Kafka/Redpanda | [Postgres](https://materialize.com/docs/ingest-data/cdc-postgres-kafka-debezium/) |
| [MySQL](mysql) | Connect to a MySQL database and stream changes to Kafka/Redpanda | [MySQL](https://materialize.com/docs/ingest-data/cdc-mysql/) |
| [SQL server](sqlserver) | Connect to a SQL server database and stream changes to Kafka/Redpanda | TODO |
<!-- | TODO: [MongoDB](mongodb) | Connect to a MongoDB database and stream changes to Kafka/Redpanda | TODO | -->

## Prerequisites

- [Docker](https://docs.docker.com/get-docker/)
- [Docker Compose](https://docs.docker.com/compose/install/)
- [Materialize](https://console.materialize.com/) account

## Running the demos

For each demo, follow the instructions in the demo's README. All demos assume that you have `psql`, `docker` and a publicly accessible Linux environment.

## Notes

Beginning with Debezium 2.0.0, Confluent Schema Registry support is not included in the Debezium containers. To enable the Confluent Schema Registry for a Debezium container, install the following Confluent Avro converter JAR files into the Connect plugin directory:

- `kafka-connect-avro-converter`

- `kafka-connect-avro-data`

- `kafka-avro-serializer`

- `kafka-schema-serializer`

- `kafka-schema-registry-client`

- `common-config`

- `common-utils`
Empty file.
179 changes: 179 additions & 0 deletions integrations/debezium/mysql/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
# Debezium + MySQL + Materialize

Before trying this out, you will need the following:

- [Materialize account](https://materialize.com/register/).
- A publicly accessible Linux server with [Docker](https://docs.docker.com/get-docker/) and [Docker Compose](https://docs.docker.com/compose/install/) installed.

## Running the demo

If you want to try it right now, follow these steps:

1. Clone the project on your Linux server and run:

```shell session
git clone https://github.com/MaterializeInc/demos.git
cd demos/integrations/debezium/mysql
```

1. After cloning the project, you will need to set the `EXTERNAL_IP` environment variable to the IP address of your Linux server. For example:

```shell session
export EXTERNAL_IP=$(hostname -I | awk '{print $1}')

# Check the value of EXTERNAL_IP
echo $EXTERNAL_IP
```

1. Bring up the Docker Compose containers in the background.

```shell session
docker compose up -d --build
```

**This may take one or two minutes to complete the first time you run it.** If all goes well, you'll have everything running in their own containers, with Debezium configured to ship changes from MySQL into Redpanda.

1. Confirm that everything is running as expected:
bobbyiliev marked this conversation as resolved.
Show resolved Hide resolved

```shell session
docker compose ps
```

1. Exec in to the redpanda container to look around using redpanda's amazing [rpk](https://docs.redpanda.com/docs/reference/rpk/) CLI.

```shell session
docker compose exec redpanda /bin/bash

rpk debug info

rpk topic list
```
1. Connect to Materialize

If you already have `psql` installed on your machine, use the provided connection string to connect:

Example:

```shell session
psql "postgres://user%40domain.com@materialize_host:6875/materialize"
```

Otherwise, you can find the steps to install and use your CLI of choice under [Supported tools](https://materialize.com/docs/integrations/sql-clients/#supported-tools).

1. Now that you're in the Materialize, define the connection to the Redpanda broker and the schema registry:
bobbyiliev marked this conversation as resolved.
Show resolved Hide resolved

```sql
-- Create Redpanda connection
CREATE CONNECTION redpanda_connection
TO KAFKA (
BROKER '<your_server_ip:9092>');

-- Create Registry connection
CREATE CONNECTION schema_registry
TO CONFLUENT SCHEMA REGISTRY (
URL 'http://<your_server_ip:8081>');
```

1. Next, define all of the tables in `demo` as sources:

```sql
CREATE SOURCE users
FROM KAFKA CONNECTION redpanda_connection (TOPIC 'mysql_repl.demo.users')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
ENVELOPE DEBEZIUM
WITH (SIZE = '3xsmall');

CREATE SOURCE roles
FROM KAFKA CONNECTION redpanda_connection (TOPIC 'mysql_repl.demo.roles')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
ENVELOPE DEBEZIUM
WITH (SIZE = '3xsmall');

CREATE SOURCE reviews
FROM KAFKA CONNECTION redpanda_connection (TOPIC 'mysql_repl.demo.reviews')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
ENVELOPE DEBEZIUM
WITH (SIZE = '3xsmall');
```

Because the three sources are pulling message schema data from the registry, materialize knows the column types to use for each attribute.

1. Create a materialized view that has only the VIP users:

```sql
CREATE MATERIALIZED VIEW vip_users AS
SELECT
users.id,
users.name,
users.email,
users.role_id,
roles.name AS role_name
FROM users
JOIN roles ON users.role_id = roles.id
WHERE users.role_id = 4;
```

1. Create a materialized view that has only the bad reviews:

```sql
CREATE MATERIALIZED VIEW bad_reviews AS
SELECT
reviews.user_id,
reviews.comment,
reviews.rating,
reviews.created_at,
reviews.updated_at
FROM reviews
WHERE reviews.rating < 4;
```

1. Create a materialized view that filters all VIP users with bad reviews:

```sql
CREATE MATERIALIZED VIEW vip_users_with_bad_reviews AS
SELECT
vip_users.name,
vip_users.email,
vip_users.role_name,
bad_reviews.rating,
bad_reviews.comment
FROM vip_users
JOIN bad_reviews ON vip_users.id = bad_reviews.user_id;
```

1. Query the materialized view:

```sql
SELECT * FROM vip_users_with_bad_reviews;
```

Or use the `SUBSCRIBE` command to stream the results:

```sql
COPY (SUBSCRIBE vip_users_with_bad_reviews) TO STDOUT;
```

## Cleanup

To stop the services and remove the containers, run:

```shell session
docker compose down
```

In Materialize, run:

```sql
DROP CONNECTION redpanda_connection CASCADE;
DROP CONNECTION schema_registry CASCADE;
```

## Helpful resources:

* [`CREATE CONNECTION`](https://materialize.com/docs/sql/create-connection/)
* [`MySQL CDC using Kafka and Debezium`](https://materialize.com/docs/ingest-data/cdc-mysql/)
* [`CREATE MATERIALIZED VIEW`](https://materialize.com/docs/sql/create-materialized-view)

## Community

If you have any questions or comments, please join the [Materialize Slack Community](https://materialize.com/s/chat)!
49 changes: 49 additions & 0 deletions integrations/debezium/mysql/connect/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
FROM debezium/connect-base:2.4

#
# Set up the plugins directory ...
#
ENV CONFLUENT_VERSION=7.0.1 \
AVRO_VERSION=1.10.1 \
GUAVA_VERSION=31.0.1-jre

RUN docker-maven-download confluent kafka-connect-avro-converter "$CONFLUENT_VERSION" fd03a1436f29d39e1807e2fb6f8e415a && \
docker-maven-download confluent kafka-connect-avro-data "$CONFLUENT_VERSION" d27f30e9eca4ef1129289c626e9ce1f1 && \
docker-maven-download confluent kafka-avro-serializer "$CONFLUENT_VERSION" c72420603422ef54d61f493ca338187c && \
docker-maven-download confluent kafka-schema-serializer "$CONFLUENT_VERSION" 9c510db58119ef66d692ae172d5b1204 && \
docker-maven-download confluent kafka-schema-registry-client "$CONFLUENT_VERSION" 7449df1f5c9a51c3e82e776eb7814bf1 && \
docker-maven-download confluent common-config "$CONFLUENT_VERSION" aab5670de446af5b6f10710e2eb86894 && \
docker-maven-download confluent common-utils "$CONFLUENT_VERSION" 74bf5cc6de2748148f5770bccd83a37c && \
docker-maven-download central org/apache/avro avro "$AVRO_VERSION" 35469fee6d74ecbadce4773bfe3a204c && \
docker-maven-download central com/google/guava guava "$GUAVA_VERSION" bb811ca86cba6506cca5d415cd5559a7

# https://github.com/debezium/container-images/blob/main/connect/2.4/Dockerfile
LABEL maintainer="Debezium Community"

ENV DEBEZIUM_VERSION="2.4.0.Final" \
MAVEN_REPO_CENTRAL="" \
MAVEN_REPOS_ADDITIONAL="" \
MAVEN_DEP_DESTINATION=$KAFKA_CONNECT_PLUGINS_DIR \
MONGODB_MD5=a22784387e0ec8a6abb1606c2c365cb2 \
MYSQL_MD5=4bff262afc9678f5cbc3be6315b8e71e \
POSTGRES_MD5=b42c9e208410f39ad1ad09778b1e3f03 \
SQLSERVER_MD5=9b8bf3c62a7c22c465a32fa27b3cffb5 \
ORACLE_MD5=21699814400860457dc2334b165882e6 \
DB2_MD5=0727d7f2d1deeacef39e230acac835a8 \
SPANNER_MD5=186b07595e914e9139941889fd675044 \
VITESS_MD5=3b4d24c8c9898df060c408a13fd3429f \
JDBC_MD5=77c5cb9adf932ab17c041544f4ade357 \
KCRESTEXT_MD5=25c0353f5a7304b3c4780a20f0f5d0af \
SCRIPTING_MD5=53a3661e7a9877744f4a30d6483d7957

RUN docker-maven-download debezium mongodb "$DEBEZIUM_VERSION" "$MONGODB_MD5" && \
docker-maven-download debezium mysql "$DEBEZIUM_VERSION" "$MYSQL_MD5" && \
docker-maven-download debezium postgres "$DEBEZIUM_VERSION" "$POSTGRES_MD5" && \
docker-maven-download debezium sqlserver "$DEBEZIUM_VERSION" "$SQLSERVER_MD5" && \
docker-maven-download debezium oracle "$DEBEZIUM_VERSION" "$ORACLE_MD5" && \
docker-maven-download debezium-additional db2 db2 "$DEBEZIUM_VERSION" "$DB2_MD5" && \
docker-maven-download debezium-additional jdbc jdbc "$DEBEZIUM_VERSION" "$JDBC_MD5" && \
docker-maven-download debezium-additional spanner spanner "$DEBEZIUM_VERSION" "$SPANNER_MD5" && \
docker-maven-download debezium-additional vitess vitess "$DEBEZIUM_VERSION" "$VITESS_MD5" && \
docker-maven-download debezium-optional connect-rest-extension "$DEBEZIUM_VERSION" "$KCRESTEXT_MD5" && \
docker-maven-download debezium-optional scripting "$DEBEZIUM_VERSION" "$SCRIPTING_MD5"
10 changes: 10 additions & 0 deletions integrations/debezium/mysql/deploy/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM ubuntu:latest

RUN apt-get update && apt-get -qy install curl mysql-client

COPY . /deploy

COPY docker-entrypoint.sh /usr/local/bin
RUN chmod 777 /usr/local/bin/docker-entrypoint.sh

ENTRYPOINT ["docker-entrypoint.sh"]
7 changes: 7 additions & 0 deletions integrations/debezium/mysql/deploy/docker-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

set -euo pipefail

cd /deploy

bash mysql_dbz.sh
66 changes: 66 additions & 0 deletions integrations/debezium/mysql/deploy/mysql_dbz.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/bin/bash

#Initialize Debezium (Kafka Connect Component)

while true; do
echo "Waiting for Debezium to be ready"
sleep 0.1
curl -s -o /dev/null -w "%{http_code}" http://debezium:8083/connectors/ | grep 200
if [ $? -eq 0 ]; then
echo "Debezium is ready"
break
fi
done

# Read the JSON file and register the connector and change the ${EXTERNAL_IP} with the external IP environment variable
sed -i "s/EXTERNAL_IP/${EXTERNAL_IP}/g" /deploy/register-mysql.json

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://debezium:8083/connectors/ -d @/deploy/register-mysql.json

if [ $? -eq 0 ]; then
echo "Debezium connector registered"
else
echo "Debezium connector registration failed"
exit 1
fi

##
# Reviews generation mock script
# Table details:
# - name: reviews
# - columns:
# - id
# - user_id
# - review_text
# - review_rating
# - created_at
# - updated_at
##

# Start generating reviews
echo "Generating reviews..."
id=1
while [[ true ]] ; do

# Define variables
user_role=$(seq 1 4 | sort -R | head -n1)
review_rating=$(seq 1 10 | sort -R | head -n1)
review_text="Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."

echo "Generating review for user_id: ${id}"

# Generate users
# Assuming you have added name and role_id columns to your demo.users table
mysql -h mysql -u mysqluser -pmysqlpwd -e "INSERT INTO demo.users (id, name, email, role_id) VALUES ( ${id}, 'user${id}', 'user${id}@demo.com', ${user_role} );" 2> /dev/null

# Generate reviews
# Assuming you have updated column names in your demo.reviews table
mysql -h mysql -u mysqluser -pmysqlpwd -e "INSERT INTO demo.reviews (user_id, comment, rating, created_at, updated_at) VALUES ( ${id}, '${review_text}', ${review_rating}, NOW(), NOW() );" 2> /dev/null

# Increment id
((id=id+1))

# Sleep for 1 second
sleep 1

done
23 changes: 23 additions & 0 deletions integrations/debezium/mysql/deploy/register-mysql.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "mysqlpwd",
"database.server.id":"223344",
"topic.prefix": "mysql_repl",
"database.include.list": "demo",
"database.history.kafka.topic":"mysql_repl.history",
"database.history.kafka.bootstrap.servers":"EXTERNAL_IP:9092",
"schema.history.internal.kafka.bootstrap.servers": "EXTERNAL_IP:9092",
"schema.history.internal.kafka.topic": "mysql_repl.internal.history",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://redpanda:8081",
"value.converter.schema.registry.url": "http://redpanda:8081",
"include.schema.changes": false
}
}
Loading