Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Debezium CDC examples for Postgres & SQL Server & MySQL #93

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions integrations/debezium/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Materialize + Debezium Examples

## Overview

This is a collection of demos that show how to use the [Debezium](https://materialize.com/docs/ingest-data/debezium/) connector with Materialize.

## Demos

| Demo | Description | Materialize Docs |
| ---------------------------------- | --------------------------------------------------------------------------- | ------------------------------------------------------------------------------------ |
| [Postgres](postgres) | Connect to a Postgres database and stream changes to Kafka/Redpanda | [Postgres](https://materialize.com/docs/ingest-data/cdc-postgres-kafka-debezium/) |
| [MySQL](mysql) | Connect to a MySQL database and stream changes to Kafka/Redpanda | [MySQL](https://materialize.com/docs/ingest-data/cdc-mysql/) |
| [SQL server](sqlserver) | Connect to a SQL server database and stream changes to Kafka/Redpanda | TODO |
| [MongoDB](mongodb) | Connect to a MongoDB database and stream changes to Kafka/Redpanda | WIP: This demo is not yet complete. |

## Prerequisites

- [Docker](https://docs.docker.com/get-docker/)
- [Docker Compose](https://docs.docker.com/compose/install/)
- [Materialize](https://console.materialize.com/) account

## Running the demos

For each demo, follow the instructions in the demo's README. All demos assume that you have `psql`, `docker` and a publicly accessible Linux environment.

## Notes

Beginning with Debezium 2.0.0, Confluent Schema Registry support is not included in the Debezium containers. To enable the Confluent Schema Registry for a Debezium container, install the following Confluent Avro converter JAR files into the Connect plugin directory:

- `kafka-connect-avro-converter`

- `kafka-connect-avro-data`

- `kafka-avro-serializer`

- `kafka-schema-serializer`

- `kafka-schema-registry-client`

- `common-config`

- `common-utils`
180 changes: 180 additions & 0 deletions integrations/debezium/mongodb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# Debezium + MongoDB + Materialize

> {notice}: WIP: This demo is a work in progress. It is not yet ready for use.

- When trying to create a source with envelope debezium, the following error is thrown:

```sql
ERROR: 'before' column must be of type record
```

An example of the records generated:

- Insert:
```sql
-[ RECORD 1 ]-----+------------------------------------------------------------------------------------------------------------------
id | 102
before |
after | {"_id":{"$numberLong":"102"},"description":"12V car battery","name":"car battery","quantity":8,"weight":8.1}
updateDescription |
source | (2.4.0.Final,mongodb,dbserver1,0,true,inventory,,rs0,products,-1,,,)
op | r
ts_ms | 1698676832093
transaction |
```

- Update:
```sql
-[ RECORD 2 ]-----+------------------------------------------------------------------------------------------------------------------
id | 101
before | {"_id":{"$numberLong":"101"},"description":"Small 2-wheel scooter","name":"scooter","quantity":3,"weight":3.14}
after | {"_id":{"$numberLong":"101"},"description":"Updated 2-wheel scooter","name":"scooter","quantity":3,"weight":3.14}
updateDescription | (,"{""description"":""Updated 2-wheel scooter""}",)
source | (2.4.0.Final,mongodb,dbserver1,1698676924000,false,inventory,,rs0,products,1,,,1698676924686)
op | u
ts_ms | 1698676924707
transaction |
```

---

Before trying this out, you will need the following:

- [Materialize account](https://materialize.com/register/).
- A publicly accessible Linux server with [Docker](https://docs.docker.com/get-docker/) and [Docker Compose](https://docs.docker.com/compose/install/) installed.

## Running the demo

If you want to try it right now, follow these steps:

1. Clone the project on your Linux server and run:

```shell session
git clone https://github.com/MaterializeInc/demos.git
cd demos/integrations/debezium/mongodb
```

1. After cloning the project, you will need to set the `EXTERNAL_IP` environment variable to the IP address of your Linux server. For example:

```shell session
export EXTERNAL_IP=$(hostname -I | awk '{print $1}')

# Check the value of EXTERNAL_IP
echo $EXTERNAL_IP
```

1. Bring up only the MongoDB container in the background.

```shell session
docker compose up -d --build mongodb
```

1. Initialize the MongoDB replica set.

```shell session
docker compose exec mongodb bash -c "/usr/local/bin/init-inventory.sh"
```

1. Bring up the rest of the Docker containers in the background.

```shell session
docker compose up -d --build
```

**This may take one or two minutes to complete the first time you run it.** If all goes well, you'll have everything running in their own containers, with Debezium configured to ship changes from Mongo into Redpanda.

1. Confirm that everything is running as expected:

```shell session
docker compose ps
```

1. Exec in to the redpanda container to look around using redpanda's amazing [rpk](https://docs.redpanda.com/docs/reference/rpk/) CLI.

```shell session
docker compose exec redpanda /bin/bash

rpk debug info

rpk topic list
```
1. Connect to Materialize

If you already have `psql` installed on your machine, use the provided connection string to connect:

Example:

```shell session
psql "postgres://user%40domain.com@materialize_host:6875/materialize"
```

Otherwise, you can find the steps to install and use your CLI of choice under [Supported tools](https://materialize.com/docs/integrations/sql-clients/#supported-tools).

1. Now that you're in the Materialize, define the connection to the Redpanda broker and the schema registry:

```sql
-- Create Redpanda connection
CREATE CONNECTION redpanda_connection
TO KAFKA (
BROKER '<your_server_ip:9092>');

-- Create Registry connection
CREATE CONNECTION schema_registry
TO CONFLUENT SCHEMA REGISTRY (
URL 'http://<your_server_ip:8081>');
```

1. Next, define all of the tables in `demo` as sources:

```sql
CREATE SOURCE customers
FROM KAFKA CONNECTION redpanda_connection (TOPIC 'dbserver1.inventory.customers')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
ENVELOPE DEBEZIUM
WITH (SIZE = '3xsmall');

CREATE SOURCE orders
FROM KAFKA CONNECTION redpanda_connection (TOPIC 'dbserver1.inventory.orders')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
ENVELOPE DEBEZIUM
WITH (SIZE = '3xsmall');

CREATE SOURCE products
FROM KAFKA CONNECTION redpanda_connection (TOPIC 'dbserver1.inventory.products')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
ENVELOPE DEBEZIUM
WITH (SIZE = '3xsmall');
```

Because the three sources are pulling message schema data from the registry, materialize knows the column types to use for each attribute.


1. Select from one of sources to see the data:

```sql
SELECT * FROM customers LIMIT 5;
```

## Cleanup

To stop the services and remove the containers, run:

```shell session
docker compose down
```

In Materialize, run:

```sql
DROP CONNECTION redpanda_connection CASCADE;
DROP CONNECTION schema_registry CASCADE;
```

## Helpful resources:

* [`CREATE SOURCE`](https://materialize.com/docs/sql/create-source)
* [`CREATE MATERIALIZED VIEW`](https://materialize.com/docs/sql/create-materialized-view)

## Community

If you have any questions or comments, please join the [Materialize Slack Community](https://materialize.com/s/chat)!
49 changes: 49 additions & 0 deletions integrations/debezium/mongodb/connect/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
FROM debezium/connect-base:2.4

#
# Set up the plugins directory ...
#
ENV CONFLUENT_VERSION=7.0.1 \
AVRO_VERSION=1.10.1 \
GUAVA_VERSION=31.0.1-jre

RUN docker-maven-download confluent kafka-connect-avro-converter "$CONFLUENT_VERSION" fd03a1436f29d39e1807e2fb6f8e415a && \
docker-maven-download confluent kafka-connect-avro-data "$CONFLUENT_VERSION" d27f30e9eca4ef1129289c626e9ce1f1 && \
docker-maven-download confluent kafka-avro-serializer "$CONFLUENT_VERSION" c72420603422ef54d61f493ca338187c && \
docker-maven-download confluent kafka-schema-serializer "$CONFLUENT_VERSION" 9c510db58119ef66d692ae172d5b1204 && \
docker-maven-download confluent kafka-schema-registry-client "$CONFLUENT_VERSION" 7449df1f5c9a51c3e82e776eb7814bf1 && \
docker-maven-download confluent common-config "$CONFLUENT_VERSION" aab5670de446af5b6f10710e2eb86894 && \
docker-maven-download confluent common-utils "$CONFLUENT_VERSION" 74bf5cc6de2748148f5770bccd83a37c && \
docker-maven-download central org/apache/avro avro "$AVRO_VERSION" 35469fee6d74ecbadce4773bfe3a204c && \
docker-maven-download central com/google/guava guava "$GUAVA_VERSION" bb811ca86cba6506cca5d415cd5559a7

# https://github.com/debezium/container-images/blob/main/connect/2.4/Dockerfile
LABEL maintainer="Debezium Community"

ENV DEBEZIUM_VERSION="2.4.0.Final" \
MAVEN_REPO_CENTRAL="" \
MAVEN_REPOS_ADDITIONAL="" \
MAVEN_DEP_DESTINATION=$KAFKA_CONNECT_PLUGINS_DIR \
MONGODB_MD5=a22784387e0ec8a6abb1606c2c365cb2 \
MYSQL_MD5=4bff262afc9678f5cbc3be6315b8e71e \
POSTGRES_MD5=b42c9e208410f39ad1ad09778b1e3f03 \
SQLSERVER_MD5=9b8bf3c62a7c22c465a32fa27b3cffb5 \
ORACLE_MD5=21699814400860457dc2334b165882e6 \
DB2_MD5=0727d7f2d1deeacef39e230acac835a8 \
SPANNER_MD5=186b07595e914e9139941889fd675044 \
VITESS_MD5=3b4d24c8c9898df060c408a13fd3429f \
JDBC_MD5=77c5cb9adf932ab17c041544f4ade357 \
KCRESTEXT_MD5=25c0353f5a7304b3c4780a20f0f5d0af \
SCRIPTING_MD5=53a3661e7a9877744f4a30d6483d7957

RUN docker-maven-download debezium mongodb "$DEBEZIUM_VERSION" "$MONGODB_MD5" && \
docker-maven-download debezium mysql "$DEBEZIUM_VERSION" "$MYSQL_MD5" && \
docker-maven-download debezium postgres "$DEBEZIUM_VERSION" "$POSTGRES_MD5" && \
docker-maven-download debezium sqlserver "$DEBEZIUM_VERSION" "$SQLSERVER_MD5" && \
docker-maven-download debezium oracle "$DEBEZIUM_VERSION" "$ORACLE_MD5" && \
docker-maven-download debezium-additional db2 db2 "$DEBEZIUM_VERSION" "$DB2_MD5" && \
docker-maven-download debezium-additional jdbc jdbc "$DEBEZIUM_VERSION" "$JDBC_MD5" && \
docker-maven-download debezium-additional spanner spanner "$DEBEZIUM_VERSION" "$SPANNER_MD5" && \
docker-maven-download debezium-additional vitess vitess "$DEBEZIUM_VERSION" "$VITESS_MD5" && \
docker-maven-download debezium-optional connect-rest-extension "$DEBEZIUM_VERSION" "$KCRESTEXT_MD5" && \
docker-maven-download debezium-optional scripting "$DEBEZIUM_VERSION" "$SCRIPTING_MD5"
10 changes: 10 additions & 0 deletions integrations/debezium/mongodb/deploy/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM ubuntu:latest

RUN apt-get update && apt-get -qy install curl

COPY . /deploy

COPY docker-entrypoint.sh /usr/local/bin
RUN chmod 777 /usr/local/bin/docker-entrypoint.sh

ENTRYPOINT ["docker-entrypoint.sh"]
7 changes: 7 additions & 0 deletions integrations/debezium/mongodb/deploy/docker-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

set -euo pipefail

cd /deploy

bash mongo_dbz.sh
25 changes: 25 additions & 0 deletions integrations/debezium/mongodb/deploy/mongo_dbz.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash

#Initialize Debezium (Kafka Connect Component)

while true; do
echo "Waiting for Debezium to be ready"
sleep 0.1
curl -s -o /dev/null -w "%{http_code}" http://debezium:8083/connectors/ | grep 200
if [ $? -eq 0 ]; then
echo "Debezium is ready"
break
fi
done

# Read the JSON file and register the connector and change the ${EXTERNAL_IP} with the external IP environment variable
sed -i "s/EXTERNAL_IP/${EXTERNAL_IP}/g" /deploy/register-mongodb.json

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://debezium:8083/connectors/ -d @/deploy/register-mongodb.json

if [ $? -eq 0 ]; then
echo "Debezium connector registered"
else
echo "Debezium connector registration failed"
exit 1
fi
18 changes: 18 additions & 0 deletions integrations/debezium/mongodb/deploy/register-mongodb.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"name": "inventory-connector",
"config": {
"connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max" : "1",
"topic.prefix" : "dbserver1",
"mongodb.user" : "debezium",
"mongodb.password" : "dbz",
"mongodb.connection.string": "mongodb://mongodb:27017/?replicaSet=rs0",
"capture.mode": "change_streams_update_full_with_pre_image",
"database.include.list" : "inventory",
"database.history.kafka.bootstrap.servers": "138.68.72.112:9092",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://138.68.72.112:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://138.68.72.112:8081"
}
}
Loading