title | summary | toc |
---|---|---|
Change Data Capture (CDC) |
Change data capture (CDC) provides efficient, distributed, row-level change subscriptions. |
true |
Change data capture (CDC) provides efficient, distributed, row-level change feeds into Apache Kafka for downstream processing such as reporting, caching, or full-text indexing.
While CockroachDB is an excellent system of record, it also needs to coexist with other systems. For example, you might want to keep your data mirrored in full-text indexes, analytics engines, or big data pipelines.
The core feature of CDC is the changefeed. Changefeeds target a whitelist of tables, called the "watched rows". Every change to a watched row is emitted as a record in a configurable format (JSON or Avro) to a configurable sink (Kafka).
-
In most cases, each version of a row will be emitted once. However, some infrequent conditions (e.g., node failures, network partitions) will cause them to be repeated. This gives our changefeeds an at-least-once delivery guarantee.
-
Once a row has been emitted with some timestamp, no previously unseen versions of that row will be emitted with a lower timestamp. That is, you will never see a new change for that row at an earlier timestamp.
For example, if you ran the following:
> CREATE TABLE foo (id INT PRIMARY KEY DEFAULT unique_rowid(), name STRING); > CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://localhost:9092' WITH UPDATED; > INSERT INTO foo VALUES (1, 'Carl'); > UPDATE foo SET name = 'Petee' WHERE id = 1;
You'd expect the changefeed to emit:
[1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"} [1] {"__crdb__": {"updated": <timestamp 2>}, "id": 1, "name": "Petee"}
It is also possible that the changefeed emits an out of order duplicate of an earlier value that you already saw:
[1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"} [1] {"__crdb__": {"updated": <timestamp 2>}, "id": 1, "name": "Petee"} [1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"}
However, you will never see an output like the following (i.e., an out of order row that you've never seen before):
[1] {"__crdb__": {"updated": <timestamp 2>}, "id": 1, "name": "Petee"} [1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"}
-
If a row is modified more than once in the same transaction, only the last change will be emitted.
-
Rows are sharded between Kafka partitions by the row’s primary key.
-
The
UPDATED
option adds an "updated" timestamp to each emitted row. You can also use theRESOLVED
option to emit periodic "resolved" timestamp messages to each Kafka partition. A "resolved" timestamp is a guarantee that no (previously unseen) rows with a lower update timestamp will be emitted on that partition.For example:
{"__crdb__": {"updated": "1532377312562986715.0000000000"}, "id": 1, "name": "Petee H"} {"__crdb__": {"updated": "1532377306108205142.0000000000"}, "id": 2, "name": "Carl"} {"__crdb__": {"updated": "1532377358501715562.0000000000"}, "id": 3, "name": "Ernie"} {"__crdb__":{"resolved":"1532379887442299001.0000000000"}} {"__crdb__":{"resolved":"1532379888444290910.0000000000"}} {"__crdb__":{"resolved":"1532379889448662988.0000000000"}} ... {"__crdb__":{"resolved":"1532379922512859361.0000000000"}} {"__crdb__": {"updated": "1532379923319195777.0000000000"}, "id": 4, "name": "Lucky"}
-
With duplicates removed, an individual row is emitted in the same order as the transactions that updated it. However, this is not true for updates to two different rows, even two rows in the same table. Resolved timestamp notifications on every Kafka partition can be used to provide strong ordering and global consistency guarantees by buffering records in between timestamp closures.
Because CockroachDB supports transactions that can affect any part of the cluster, it is not possible to horizontally divide the transaction log into independent changefeeds.
When schema changes with column backfill (e.g., adding a column with a default, adding a computed column, adding a NOT NULL
column, dropping a column) are made to watched rows, the changefeed will emit some duplicates during the backfill. When it finishes, CockroachDB outputs all watched rows using the new schema.
Rows that have been backfilled by a schema change are always re-emitted because Avro's default schema change functionality is not powerful enough to represent the schema changes that CockroachDB supports (e.g., CockroachDB columns can have default values that are arbitrary SQL expressions, but Avro only supports static default values).
To ensure that the Avro schemas that CockroachDB publishes will work with the (undocumented and inconsistent) schema compatibility rules used by the Confluent schema registry, CockroachDB emits all fields in Avro as nullable unions. This ensures that Avro and Confluent consider the schemas to be both backward- and forward-compatible. Note that the original CockroachDB column definition is also included in the schema as a doc field, so it's still possible to distinguish between a NOT NULL
CockroachDB column and a NULL
CockroachDB column.
For an example of a schema change with column backfill, start with the changefeed created in the example below:
[1] {"id": 1, "name": "Petee H"}
[2] {"id": 2, "name": "Carl"}
[3] {"id": 3, "name": "Ernie"}
Add a column to the watched table:
{% include copy-clipboard.html %}
> ALTER TABLE office_dogs ADD COLUMN likes_treats BOOL DEFAULT TRUE;
The changefeed emits duplicate records 1, 2, and 3 before outputting the records using the new schema:
[1] {"id": 1, "name": "Petee H"}
[2] {"id": 2, "name": "Carl"}
[3] {"id": 3, "name": "Ernie"}
[1] {"id": 1, "name": "Petee H"} # Duplicate
[2] {"id": 2, "name": "Carl"} # Duplicate
[3] {"id": 3, "name": "Ernie"} # Duplicate
[1] {"id": 1, "likes_treats": true, "name": "Petee H"}
[2] {"id": 2, "likes_treats": true, "name": "Carl"}
[3] {"id": 3, "likes_treats": true, "name": "Ernie"}
New in v19.1: Previously created changefeeds collect changes by periodically sending a request for any recent changes. Newly created changefeeds now behave differently: they connect a long-lived request (i.e., a rangefeed), which pushes changes as they happen. This reduces the latency of row changes, as well as reduces transaction restarts on tables being watched by a changefeed for some workloads.
To enable rangefeeds, set the kv.rangefeed.enabled
cluster setting to true
. Any created changefeed will error until this setting is enabled. Note that enabling rangefeeds currently has a small performance cost (about a 5-10% increase in latencies), whether or not the rangefeed is being using in a changefeed.
If you are experiencing an issue, you can revert back to the previous behavior by setting changefeed.push.enabled
to false
. Note that this setting will be removed in a future release; if you have to use the fallback, please file a Github issue.
{{site.data.alerts.callout_info}}
To enable rangefeeds for an existing changefeed, you must also restart the changefeed. For an enterprise changefeed, pause and resume the changefeed. For a core changefeed, cut the connection (CTRL+C) and reconnect using the cursor
option.
{{site.data.alerts.end}}
The kv.closed_timestamp.target_duration
cluster setting can be used with push changefeeds. Resolved timestamps will always be behind by at least this setting's duration; however, decreasing the duration leads to more transaction restarts in your cluster, which can affect performance.
New in v19.1: To create a core changefeed:
{% include copy-clipboard.html %}
> EXPERIMENTAL CHANGEFEED FOR name;
For more information, see CHANGEFEED FOR
.
To create a changefeed:
{% include copy-clipboard.html %}
> CREATE CHANGEFEED FOR TABLE name INTO 'schema://host:port';
For more information, see CREATE CHANGEFEED
.
To pause a changefeed:
{% include copy-clipboard.html %}
> PAUSE JOB job_id;
For more information, see PAUSE JOB
.
To resume a paused changefeed:
{% include copy-clipboard.html %}
> RESUME JOB job_id;
For more information, see RESUME JOB
.
To cancel a changefeed:
{% include copy-clipboard.html %}
> CANCEL JOB job_id;
For more information, see CANCEL JOB
.
{{site.data.alerts.callout_info}} Monitoring is only available for enterprise changefeeds. {{site.data.alerts.end}}
Changefeed progress is exposed as a high-water timestamp that advances as the changefeed progresses. This is a guarantee that all changes before or at the timestamp have been emitted. You can monitor a changefeed:
-
On the Jobs page of the Admin UI. Hover over the high-water timestamp to view the system time.
-
Using
crdb_internal.jobs
:{% include copy-clipboard.html %}
> SELECT * FROM crdb_internal.jobs WHERE job_id = <job_id>;
job_id | job_type | description | ... | high_water_timestamp | error | coordinator_id +--------------------+------------+------------------------------------------------------------------------+ ... +--------------------------------+-------+----------------+ 383870400694353921 | CHANGEFEED | CREATE CHANGEFEED FOR TABLE office_dogs2 INTO 'kafka://localhost:9092' | ... | 1537279405671006870.0000000000 | | 1 (1 row)
{{site.data.alerts.callout_info}} You can use the high-water timestamp to start a new changefeed where another ended. {{site.data.alerts.end}}
{{site.data.alerts.callout_info}} Debugging is only available for enterprise changefeeds. {{site.data.alerts.end}}
For changefeeds connected to Kafka, use log information to debug connection issues (i.e., kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
). Debug by looking for lines in the logs with [kafka-producer]
in them:
I190312 18:56:53.535646 585 vendor/github.com/Shopify/sarama/client.go:123 [kafka-producer] Initializing new client
I190312 18:56:53.535714 585 vendor/github.com/Shopify/sarama/client.go:724 [kafka-producer] client/metadata fetching metadata for all topics from broker localhost:9092
I190312 18:56:53.536730 569 vendor/github.com/Shopify/sarama/broker.go:148 [kafka-producer] Connected to broker at localhost:9092 (unregistered)
I190312 18:56:53.537661 585 vendor/github.com/Shopify/sarama/client.go:500 [kafka-producer] client/brokers registered new broker #0 at 172.16.94.87:9092
I190312 18:56:53.537686 585 vendor/github.com/Shopify/sarama/client.go:170 [kafka-producer] Successfully initialized new client
New in v19.1: In this example, you'll set up a core changefeed for a single-node cluster.
-
In a terminal window, start
cockroach
:{% include copy-clipboard.html %}
$ cockroach start --insecure --listen-addr=localhost --background
-
As the
root
user, open the built-in SQL client:{% include copy-clipboard.html %}
oach sql --url="postgresql://[email protected]:26257?sslmode=disable" --format=csv
{% include {{ page.version.version }}/cdc/core-url.md %}
{% include {{ page.version.version }}/cdc/core-csv.md %}
-
Enable the
kv.rangefeed.enabled
cluster setting:{% include copy-clipboard.html %}
> SET CLUSTER SETTING kv.rangefeed.enabled = true;
-
Create table
foo
:{% include copy-clipboard.html %}
> CREATE TABLE foo (a INT PRIMARY KEY);
-
Insert a row into the table:
{% include copy-clipboard.html %}
> INSERT INTO foo VALUES (0);
-
Start the core changefeed:
{% include copy-clipboard.html %}
> EXPERIMENTAL CHANGEFEED FOR foo;
table,key,value foo,[0],"{""after"": {""a"": 0}}"
-
In a new terminal, add another row:
{% include copy-clipboard.html %}
$ cockroach sql --insecure -e "INSERT INTO foo VALUES (1)"
-
Back in the terminal where the core changefeed is streaming, the following output has appeared:
foo,[1],"{""after"": {""a"": 1}}"
Note that records may take a couple of seconds to display in the core changefeed.
-
To stop streaming the changefeed, enter CTRL+C into the terminal where the changefeed is running.
-
To stop
cockroach
, run:{% include copy-clipboard.html %}
$ cockroach quit --insecure
New in v19.1: In this example, you'll set up a core changefeed for a single-node cluster that emits Avro records.
-
In a terminal window, start
cockroach
:{% include copy-clipboard.html %}
$ cockroach start --insecure --listen-addr=localhost --background
-
Download and extract the Confluent Open Source platform.
-
Move into the extracted
confluent-<version>
directory and start Confluent:{% include copy-clipboard.html %}
$ ./bin/confluent start
Only
zookeeper
,kafka
, andschema-registry
are needed. To troubleshoot Confluent, see their docs. -
As the
root
user, open the built-in SQL client:{% include copy-clipboard.html %}
$ cockroach sql --url="postgresql://[email protected]:26257?sslmode=disable" --format=csv
{% include {{ page.version.version }}/cdc/core-url.md %}
-
Enable the
kv.rangefeed.enabled
cluster setting:{% include copy-clipboard.html %}
> SET CLUSTER SETTING kv.rangefeed.enabled = true;
-
Create table
bar
:{% include copy-clipboard.html %}
> CREATE TABLE bar (a INT PRIMARY KEY);
-
Insert a row into the table:
{% include copy-clipboard.html %}
> INSERT INTO bar VALUES (0);
-
Start the core changefeed:
{% include copy-clipboard.html %}
> EXPERIMENTAL CHANGEFEED FOR bar WITH format = experimental_avro, confluent_schema_registry = 'http://localhost:8081';
table,key,value bar,\000\000\000\000\001\002\000,\000\000\000\000\002\002\002\000
-
In a new terminal, add another row:
{% include copy-clipboard.html %}
$ cockroach sql --insecure -e "INSERT INTO bar VALUES (1)"
-
Back in the terminal where the core changefeed is streaming, the output will appear:
bar,\000\000\000\000\001\002\002,\000\000\000\000\002\002\002\002
Note that records may take a couple of seconds to display in the core changefeed.
-
To stop streaming the changefeed, enter CTRL+C into the terminal where the changefeed is running.
-
To stop
cockroach
, run:{% include copy-clipboard.html %}
$ cockroach quit --insecure
-
To stop Confluent, move into the extracted
confluent-<version>
directory and stop Confluent:{% include copy-clipboard.html %}
$ ./bin/confluent stop
To stop all Confluent processes, use:
{% include copy-clipboard.html %}
$ ./bin/confluent destroy
{{site.data.alerts.callout_info}}
CREATE CHANGEFEED
is an enterprise-only feature. For the core version, see the CHANGEFEED FOR
example above.
{{site.data.alerts.end}}
In this example, you'll set up a changefeed for a single-node cluster that is connected to a Kafka sink.
-
If you do not already have one, request a trial enterprise license.
-
In a terminal window, start
cockroach
:{% include copy-clipboard.html %}
$ cockroach start --insecure --listen-addr=localhost --background
-
Download and extract the Confluent Open Source platform (which includes Kafka).
-
Move into the extracted
confluent-<version>
directory and start Confluent:{% include copy-clipboard.html %}
$ ./bin/confluent start
Only
zookeeper
andkafka
are needed. To troubleshoot Confluent, see their docs. -
Create a Kafka topic:
{% include copy-clipboard.html %}
$ ./bin/kafka-topics \ --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic office_dogs
{{site.data.alerts.callout_info}} You are expected to create any Kafka topics with the necessary number of replications and partitions. Topics can be created manually or Kafka brokers can be configured to automatically create topics with a default partition count and replication factor. {{site.data.alerts.end}}
-
As the
root
user, open the built-in SQL client:{% include copy-clipboard.html %}
$ cockroach sql --insecure
-
Set your organization name and enterprise license key that you received via email:
{% include copy-clipboard.html %}
> SET CLUSTER SETTING cluster.organization = '<organization name>';
{% include copy-clipboard.html %}
> SET CLUSTER SETTING enterprise.license = '<secret>';
-
Enable the
kv.rangefeed.enabled
cluster setting:{% include copy-clipboard.html %}
> SET CLUSTER SETTING kv.rangefeed.enabled = true;
-
Create a database called
cdc_demo
:{% include copy-clipboard.html %}
> CREATE DATABASE cdc_demo;
-
Set the database as the default:
{% include copy-clipboard.html %}
> SET DATABASE = cdc_demo;
-
Create a table and add data:
{% include copy-clipboard.html %}
> CREATE TABLE office_dogs ( id INT PRIMARY KEY, name STRING);
{% include copy-clipboard.html %}
> INSERT INTO office_dogs VALUES (1, 'Petee'), (2, 'Carl');
{% include copy-clipboard.html %}
> UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;
-
Start the changefeed:
{% include copy-clipboard.html %}
> CREATE CHANGEFEED FOR TABLE office_dogs INTO 'kafka://localhost:9092';
job_id +--------------------+ 360645287206223873 (1 row)
This will start up the changefeed in the background and return the
job_id
. The changefeed writes to Kafka. -
In a new terminal, move into the extracted
confluent-<version>
directory and start watching the Kafka topic:{% include copy-clipboard.html %}
$ ./bin/kafka-console-consumer \ --bootstrap-server=localhost:9092 \ --property print.key=true \ --from-beginning \ --topic=office_dogs
[1] {"id": 1, "name": "Petee H"} [2] {"id": 2, "name": "Carl"}
Note that the initial scan displays the state of the table as of when the changefeed started (therefore, the initial value of
"Petee"
is omitted). -
Back in the SQL client, insert more data:
{% include copy-clipboard.html %}
> INSERT INTO office_dogs VALUES (3, 'Ernie');
-
Back in the terminal where you're watching the Kafka topic, the following output has appeared:
[3] {"id": 3, "name": "Ernie"}
-
When you are done, exit the SQL shell (
\q
). -
To stop
cockroach
, run:{% include copy-clipboard.html %}
$ cockroach quit --insecure
-
To stop Kafka, move into the extracted
confluent-<version>
directory and stop Confluent:{% include copy-clipboard.html %}
$ ./bin/confluent stop
{{site.data.alerts.callout_info}}
CREATE CHANGEFEED
is an enterprise-only feature. For the core version, see the CHANGEFEED FOR
example above.
{{site.data.alerts.end}}
In this example, you'll set up a changefeed for a single-node cluster that is connected to a Kafka sink and emits Avro records.
-
If you do not already have one, request a trial enterprise license.
-
In a terminal window, start
cockroach
:{% include copy-clipboard.html %}
$ cockroach start --insecure --listen-addr=localhost --background
-
Download and extract the Confluent Open Source platform (which includes Kafka).
-
Move into the extracted
confluent-<version>
directory and start Confluent:{% include copy-clipboard.html %}
$ ./bin/confluent start
Only
zookeeper
,kafka
, andschema-registry
are needed. To troubleshoot Confluent, see their docs. -
Create a Kafka topic:
{% include copy-clipboard.html %}
$ ./bin/kafka-topics \ --create \ --zookeeper localhost:8081 \ --replication-factor 1 \ --partitions 1 \ --topic office_dogs
{{site.data.alerts.callout_info}} You are expected to create any Kafka topics with the necessary number of replications and partitions. Topics can be created manually or Kafka brokers can be configured to automatically create topics with a default partition count and replication factor. {{site.data.alerts.end}}
-
As the
root
user, open the built-in SQL client:{% include copy-clipboard.html %}
$ cockroach sql --insecure
-
Set your organization name and enterprise license key that you received via email:
{% include copy-clipboard.html %}
> SET CLUSTER SETTING cluster.organization = '<organization name>';
{% include copy-clipboard.html %}
> SET CLUSTER SETTING enterprise.license = '<secret>';
-
Enable the
kv.rangefeed.enabled
cluster setting:{% include copy-clipboard.html %}
> SET CLUSTER SETTING kv.rangefeed.enabled = true;
-
Create a database called
cdc_demo
:{% include copy-clipboard.html %}
> CREATE DATABASE cdc_demo;
-
Set the database as the default:
{% include copy-clipboard.html %}
> SET DATABASE = cdc_demo;
-
Create a table and add data:
{% include copy-clipboard.html %}
> CREATE TABLE office_dogs ( id INT PRIMARY KEY, name STRING);
{% include copy-clipboard.html %}
> INSERT INTO office_dogs VALUES (1, 'Petee'), (2, 'Carl');
{% include copy-clipboard.html %}
> UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;
-
Start the changefeed:
{% include copy-clipboard.html %}
> CREATE CHANGEFEED FOR TABLE office_dogs INTO 'kafka://localhost:9092' WITH format = experimental_avro, confluent_schema_registry = 'http://localhost:8081';
job_id +--------------------+ 360645287206223873 (1 row)
This will start up the changefeed in the background and return the
job_id
. The changefeed writes to Kafka. -
In a new terminal, move into the extracted
confluent-<version>
directory and start watching the Kafka topic:{% include copy-clipboard.html %}
$ ./bin/kafka-avro-console-consumer \ --bootstrap-server=localhost:9092 \ --property print.key=true \ --from-beginning \ --topic=office_dogs
{"id":1} {"id":1,"name":{"string":"Petee H"}} {"id":2} {"id":2,"name":{"string":"Carl"}}
Note that the initial scan displays the state of the table as of when the changefeed started (therefore, the initial value of
"Petee"
is omitted). -
Back in the SQL client, insert more data:
{% include copy-clipboard.html %}
> INSERT INTO office_dogs VALUES (3, 'Ernie');
-
Back in the terminal where you're watching the Kafka topic, the following output has appeared:
{"id":3} {"id":3,"name":{"string":"Ernie"}}
-
When you are done, exit the SQL shell (
\q
). -
To stop
cockroach
, run:{% include copy-clipboard.html %}
$ cockroach quit --insecure
-
To stop Kafka, move into the extracted
confluent-<version>
directory and stop Confluent:{% include copy-clipboard.html %}
$ ./bin/confluent stop
{{site.data.alerts.callout_info}}
CREATE CHANGEFEED
is an enterprise-only feature. For the core version, see the CHANGEFEED FOR
example above.
{{site.data.alerts.end}}
{% include {{ page.version.version }}/misc/experimental-warning.md %}
New in v19.1: In this example, you'll set up a changefeed for a single-node cluster that is connected to an AWS sink. Note that you can set up changefeeds for any of these cloud storages.
-
If you do not already have one, request a trial enterprise license.
-
In a terminal window, start
cockroach
:{% include copy-clipboard.html %}
$ cockroach start --insecure --listen-addr=localhost --background
-
As the
root
user, open the built-in SQL client:{% include copy-clipboard.html %}
$ cockroach sql --insecure
-
Set your organization name and enterprise license key that you received via email:
{% include copy-clipboard.html %}
> SET CLUSTER SETTING cluster.organization = '<organization name>';
{% include copy-clipboard.html %}
> SET CLUSTER SETTING enterprise.license = '<secret>';
-
Enable the
kv.rangefeed.enabled
cluster setting:{% include copy-clipboard.html %}
> SET CLUSTER SETTING kv.rangefeed.enabled = true;
-
Create a database called
cdc_demo
:{% include copy-clipboard.html %}
> CREATE DATABASE cdc_demo;
-
Set the database as the default:
{% include copy-clipboard.html %}
> SET DATABASE = cdc_demo;
-
Create a table and add data:
{% include copy-clipboard.html %}
> CREATE TABLE office_dogs ( id INT PRIMARY KEY, name STRING);
{% include copy-clipboard.html %}
> INSERT INTO office_dogs VALUES (1, 'Petee'), (2, 'Carl');
{% include copy-clipboard.html %}
> UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;
-
Start the changefeed:
{% include copy-clipboard.html %}
> CREATE CHANGEFEED FOR TABLE office_dogs INTO 'experimental-s3://test-s3encryption/test?AWS_ACCESS_KEY_ID=enter_key-here&AWS_SECRET_ACCESS_KEY=enter_key_here' with updated, resolved='10s';
job_id +--------------------+ 360645287206223873 (1 row)
This will start up the changefeed in the background and return the
job_id
. The changefeed writes to AWS. -
Monitor your changefeed on the Admin UI (http://localhost:8080/#/metrics/changefeeds/cluster). For more information, see Changefeeds Dashboard.
-
When you are done, exit the SQL shell (
\q
). -
To stop
cockroach
, run:{% include copy-clipboard.html %}
$ cockroach quit --insecure
{% include {{ page.version.version }}/known-limitations/cdc.md %}