Skip to content

Commit

Permalink
Merge pull request #4403 from cockroachdb/cdc
Browse files Browse the repository at this point in the history
CDC: 19.1 updates
  • Loading branch information
lhirata authored Apr 3, 2019
2 parents 045c37e + 0959dfb commit 6bba1a9
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 54 deletions.
2 changes: 1 addition & 1 deletion _includes/v19.1/cdc/core-url.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{{site.data.alerts.callout_info}}
Because core changefeeds return results differently than other SQL statements, they require a dedicated database connection with specific settings around result buffering. In normal operation, CockroachDB improves performance by buffering results server-side before returning them to a client. This can cause unbounded delivery latency for core changefeeds, so the `results_buffer_size` connection string parameter is used to disable buffering. Core changefeeds also have different cancellation behavior than other queries: they can only be canceled by closing the underlying connection or issuing a [`CANCEL QUERY`](cancel-query.html) statement on a separate connection. Combined, these attributes of changefeeds mean that applications should explicitly create dedicated connections to consume changefeed data, instead of using a connection pool as most client drivers do by default.
Because core changefeeds return results differently than other SQL statements, they require a dedicated database connection with specific settings around result buffering. In normal operation, CockroachDB improves performance by buffering results server-side before returning them to a client; however, result buffering is automatically turned off for core changefeeds. Core changefeeds also have different cancellation behavior than other queries: they can only be canceled by closing the underlying connection or issuing a [`CANCEL QUERY`](cancel-query.html) statement on a separate connection. Combined, these attributes of changefeeds mean that applications should explicitly create dedicated connections to consume changefeed data, instead of using a connection pool as most client drivers do by default.
{{site.data.alerts.end}}
231 changes: 188 additions & 43 deletions v19.1/change-data-capture.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion v19.1/changefeed-for.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ foo,\000\000\000\000\001\002\024,\000\000\000\000\002\002\002\024

To stop streaming the changefeed, enter **CTRL+C** into the terminal where the changefeed is running.

For more information on how to create a core changefeed, see [Change Data Capture](change-data-capture.html#create-a-core-changefeed-with-avro).
For more information on how to create a core changefeed, see [Change Data Capture](change-data-capture.html#create-a-core-changefeed-using-avro).

<!-- ### Pause and resume a changefeed
Expand Down
129 changes: 120 additions & 9 deletions v19.1/create-changefeed.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ toc: true

The `CREATE CHANGEFEED` [statement](sql-statements.html) creates a new changefeed, which provides row-level change subscriptions.

Changefeeds target a whitelist of tables, called the "watched rows." Every change to a watched row is emitted as a record in a configurable format (`JSON`) to a configurable sink ([Kafka](https://kafka.apache.org/)).
Changefeeds target a whitelist of tables, called the "watched rows." Every change to a watched row is emitted as a record in a configurable format (`JSON`) to a configurable sink ([Kafka](https://kafka.apache.org/) or a [cloud storage sink](#cloud-storage-sink)).

For more information, see [Change Data Capture](change-data-capture.html).

Expand All @@ -29,22 +29,72 @@ Changefeeds can only be created by superusers, i.e., [members of the `admin` rol
Parameter | Description
----------|------------
`table_name` | The name of the table (or tables in a comma separated list) to create a changefeed for.
`sink` | The location of the configurable sink. The scheme of the URI indicates the type; currently, only `kafka`. There are query parameters that vary per type. Currently, the `kafka` scheme only has `topic_prefix`, which adds a prefix to all of the topic names.<br><br>Sink URI scheme: `'[scheme]://[host]:[port][?topic_prefix=[foo]]'` <br><br>For example, `CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://...?topic_prefix=bar_'` would emit rows under the topic `bar_foo` instead of `foo`.
`sink` | The location of the configurable sink. The scheme of the URI indicates the type. For more information, see [Sink URI](#sink-uri) below.
`option` / `value` | For a list of available options and their values, see [Options](#options) below.


<!-- `IF NOT EXISTS` | Create a new changefeed only if a changefeed of the same name does not already exist; if one does exist, do not return an error.
`name` | The name of the changefeed to create, which [must be unique](#create-fails-name-already-in-use) and follow these [identifier rules](keywords-and-identifiers.html#identifiers). -->

### Sink URI

The sink URI follows the basic format of:

~~~
'[scheme]://[host]:[port]?[query_parameters]'
~~~

The `scheme` can be [`kafka`](#kafka) or any [cloud storage sink](#cloud-storage-sink).

#### Kafka

Example of a Kafka sink URI:

~~~
'kafka://broker.address.com:9092?topic_prefix=bar_&tls_enabled=true&ca_cert=LS0tLS1CRUdJTiBDRVJUSUZ&sasl_enabled=true&sasl_user=petee&sasl_password=bones'
~~~

Query parameters include:

Parameter | Value | Description
----------+-------+---------------
`topic_prefix` | [`STRING`](string.html) | Adds a prefix to all topic names.<br><br>For example, `CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://...?topic_prefix=bar_'` would emit rows under the topic `bar_foo` instead of `foo`.
`tls_enabled=true` | [`BOOL`](bool.html) | If `true`, enable Transport Layer Security (TLS) on the connection to Kafka. This can be used with a `ca_cert` (see below).
`ca_cert` | [`STRING`](string.html) | The base64-encoded `ca_cert` file.<br><br>Note: To encode your `ca.cert`, run `base64 -w 0 ca.cert`.
`sasl_enabled` | [`BOOL`](bool.html) | If `true`, [use SASL/PLAIN to authenticate](https://docs.confluent.io/current/kafka/authentication_sasl/authentication_sasl_plain.html). This requires a `sasl_user` and `sasl_password` (see below).
`sasl_user` | [`STRING`](string.html) | Your SASL username.
`sasl_password` | [`STRING`](string.html) | Your SASL password.

#### Cloud storage sink

<span class="version-tag">New in v19.1:</span> Use a cloud storage sink to deliver changefeed data to OLAP or big data systems without requiring transport via Kafka.

{{site.data.alerts.callout_info}}
Currently, cloud storage sinks only work with `JSON` and emits newline-delimited `JSON` files.
{{site.data.alerts.end}}

Example of a cloud storage sink (i.e., AWS S3) URI:

~~~
'experimental-s3://test-s3encryption/test?AWS_ACCESS_KEY_ID=ABCDEFGHIJKLMNOPQ&AWS_SECRET_ACCESS_KEY=LS0tLS1CRUdJTiBDRVJUSUZ'
~~~

{{site.data.alerts.callout_info}}
The `scheme` for a cloud storage sink should be prepended with `experimental-`.
{{site.data.alerts.end}}

Any of the cloud storages below can be used as a sink:

{% include {{ page.version.version }}/misc/external-urls.md %}

### Options

Option | Value | Description
-------|-------|------------
`updated` | N/A | Include updated timestamps with each row.
`updated` | N/A | Include updated timestamps with each row.<br><br>If a `cursor` is provided, the "updated" timestamps will match the [MVCC](../v19.1/architecture/storage-layer.html#mvcc) timestamps of the emitted rows, and there is no initial scan. If a `cursor` is not provided, the changefeed will perform an initial scan (as of the time the changefeed was created), and the "updated" timestamp for each change record emitted in the initial scan will be the timestamp of the initial scan. Similarly, when a [backfill is performed for a schema change](change-data-capture.html#schema-changes-with-column-backfill), the "updated" timestamp is set to the first timestamp for when the new schema is valid.
`resolved` | [`INTERVAL`](interval.html) | Periodically emit resolved timestamps to the changefeed. Optionally, set a minimum duration between emitting resolved timestamps. If unspecified, all resolved timestamps are emitted.<br><br>Example: `resolved='10s'`
`envelope` | `key_only` / `wrapped` | Use `key_only` to emit only the key and no value, which is faster if you only want to know when the key changes.<br><br>Default: `envelope=wrapped`
`cursor` | [Timestamp](as-of-system-time.html#parameters) | Emits any changes after the given timestamp, but does not output the current state of the table first. If `cursor` is not specified, the changefeed starts by doing a consistent scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.<br><br>`cursor` can be used to [start a new changefeed where a previous changefeed ended.](#start-a-new-changefeed-where-another-ended)<br><br>Example: `CURSOR=1536242855577149065.0000000000`
`format` | `json` / `experimental_avro` | Format of the emitted record. Currently, support for [Avro is limited and experimental](#avro-limitations). <br><br>Default: `format=json`.
`cursor` | [Timestamp](as-of-system-time.html#parameters) | Emits any changes after the given timestamp, but does not output the current state of the table first. If `cursor` is not specified, the changefeed starts by doing an initial scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.<br><br>When starting a changefeed at a specific `cursor`, the `cursor` cannot be before the configured garbage collection window (see [`gc.ttlseconds`](configure-replication-zones.html#replication-zone-variables)) for the table you're trying to follow; otherwise, the changefeed will error. With default garbage collection settings, this means you cannot create a changefeed that starts more than 25 hours in the past.<br><br>`cursor` can be used to [start a new changefeed where a previous changefeed ended.](#start-a-new-changefeed-where-another-ended)<br><br>Example: `CURSOR=1536242855577149065.0000000000`
`format` | `json` / `experimental_avro` | Format of the emitted record. Currently, support for [Avro is limited and experimental](#avro-limitations). For mappings of CockroachDB types to Avro types, [see the table below](#avro-types). <br><br>Default: `format=json`.
`confluent_schema_registry` | Schema Registry address | The [Schema Registry](https://docs.confluent.io/current/schema-registry/docs/index.html#sr) address is required to use `experimental_avro`.

#### Avro limitations
Expand All @@ -60,9 +110,48 @@ Currently, support for Avro is limited and experimental. Below is a list of unsu

- [`TIME`, `DATE`, `INTERVAL`](https://github.com/cockroachdb/cockroach/issues/32472), [`UUID`, `INET`](https://github.com/cockroachdb/cockroach/issues/34417), [`ARRAY`](https://github.com/cockroachdb/cockroach/issues/34420), [`JSONB`](https://github.com/cockroachdb/cockroach/issues/34421), `BIT`, and collated `STRING` are not supported in Avro yet.

#### Avro types

Below is a mapping of CockroachDB types to Avro types:

CockroachDB Type | Avro Type | Avro Logical Type
-----------------+-----------+---------------------
[`INT`](int.html) | [`LONG`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) |
[`BOOL`](bool.html) | [`BOOLEAN`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) |
[`FLOAT`](float.html) | [`DOUBLE`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) |
[`STRING`](string.html) | [`STRING`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) |
[`DATE`](date.html) | [`INT`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) | [`DATE`](https://avro.apache.org/docs/1.8.1/spec.html#Date)
[`TIME`](time.html) | [`LONG`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) | [`TIME-MICROS`](https://avro.apache.org/docs/1.8.1/spec.html#Time+%28microsecond+precision%29)
[`TIMESTAMP`](timestamp.html) | [`LONG`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) | [`TIME-MICROS`](https://avro.apache.org/docs/1.8.1/spec.html#Time+%28microsecond+precision%29)
[`TIMESTAMPTZ`](timestamp.html) | [`LONG`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) | [`TIME-MICROS`](https://avro.apache.org/docs/1.8.1/spec.html#Time+%28microsecond+precision%29)
[`DECIMAL`](decimal.html) | [`BYTES`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) | [`DECIMAL`](https://avro.apache.org/docs/1.8.1/spec.html#Decimal)
[`UUID`](uuid.html) | [`STRING`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) |
[`INET`](inet.html) | [`STRING`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) |
[`JSONB`](jsonb.html) | [`STRING`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) |

## Responses

The messages (i.e., keys and values) emitted to a Kafka topic are specific to the [`envelope`](#options). The default format is `wrapped`, and the output messages are composed of the following:

- **Key**: An array always composed of the row's `PRIMARY KEY` field(s) (e.g., `[1]` for `JSON` or `{"id":1}` for Avro).
- **Value**:
- One of three possible top-level fields:
- `after`, which contains the state of the row after the update (or `null`' for `DELETE`s).
- `updated`, which contains the updated timestamp.
- `resolved`, which is emitted for records representing resolved timestamps. These records do not include an "after" value since they only function as checkpoints.
- For [`INSERT`](insert.html) and [`UPDATE`](update.html), the current state of the row inserted or updated.
- For [`DELETE`](delete.html), `null`.

For example:

Statement | Response
-----------------------------------------------+-----------------------------------------------------------------------
`INSERT INTO office_dogs VALUES (1, 'Petee');` | JSON: `[1] {"after": {"id": 1, "name": "Petee"}}` </br>Avro: `{"id":1} {"id":1,"name":{"string":"Petee"}}`
`DELETE FROM office_dogs WHERE name = 'Petee'` | JSON: `[1] {"after": null}` </br>Avro: `{"id":1} {null}`

## Examples

### Create a changefeed
### Create a changefeed connected to Kafka

{% include copy-clipboard.html %}
~~~ sql
Expand All @@ -79,7 +168,7 @@ Currently, support for Avro is limited and experimental. Below is a list of unsu

For more information on how to create a changefeed connected to Kafka, see [Change Data Capture](change-data-capture.html#create-a-changefeed-connected-to-kafka).

### Create a changefeed with Avro
### Create a changefeed connected to Kafka using Avro

{% include copy-clipboard.html %}
~~~ sql
Expand All @@ -94,7 +183,26 @@ For more information on how to create a changefeed connected to Kafka, see [Chan
(1 row)
~~~

For more information on how to create a changefeed that emits an [Avro](https://avro.apache.org/docs/1.8.2/spec.html) record, see [Change Data Capture](change-data-capture.html#create-a-changefeed-in-avro-connected-to-kafka).
For more information on how to create a changefeed that emits an [Avro](https://avro.apache.org/docs/1.8.2/spec.html) record, see [Change Data Capture](change-data-capture.html#create-a-changefeed-connected-to-kafka-using-avro).

### Create a changefeed connected to a cloud storage sink

{% include {{ page.version.version }}/misc/experimental-warning.md %}

{% include copy-clipboard.html %}
~~~ sql
> CREATE CHANGEFEED FOR TABLE name INTO 'experimental-scheme://host?parameters' WITH updated, resolved;
~~~
~~~
+--------------------+
| job_id |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
~~~

For more information on how to create a changefeed connected to a cloud storage sink, see [Change Data Capture](change-data-capture.html#create-a-changefeed-connected-to-a-cloud-storage-sink).

### Manage a changefeed

Expand Down Expand Up @@ -153,7 +261,10 @@ Use the `high_water_timestamp` to start the new changefeed:
> CREATE CHANGEFEED FOR TABLE name INTO 'kafka//host:port' WITH cursor = <high_water_timestamp>;
~~~

Note that because the cursor is provided, the initial scan is not performed.

## See also

- [Change Data Capture](change-data-capture.html)
- [Other SQL Statements](sql-statements.html)
- [Changefeed Dashboard](admin-ui-cdc-dashboard.html)
6 changes: 6 additions & 0 deletions v19.1/migrate-from-mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ The following options are available to `IMPORT ... MYSQLDUMP`:

By default, [`IMPORT ... MYSQLDUMP`][import] supports foreign keys. **Default: false**. Add the `skip_foreign_keys` option to speed up data import by ignoring foreign key constraints in the dump file's DDL. It will also enable you to import individual tables that would otherwise fail due to dependencies on other tables.

{{site.data.alerts.callout_info}}
The most common dependency issues are caused by unsatisfied foreign key relationships. You can avoid these issues by adding the `skip_foreign_keys` option to your `IMPORT` statement as needed. For more information, see the list of [import options](import.html#import-options).

For example, if you get the error message `pq: there is no unique constraint matching given keys for referenced table tablename`, use `IMPORT ... WITH skip_foreign_keys`.
{{site.data.alerts.end}}

Example usage:

{% include copy-clipboard.html %}
Expand Down
6 changes: 6 additions & 0 deletions v19.1/migrate-from-postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ Example usage:

By default, [`IMPORT ... PGDUMP`][import] supports foreign keys. **Default: false**. Add the `skip_foreign_keys` option to speed up data import by ignoring foreign key constraints in the dump file's DDL. It will also enable you to import individual tables that would otherwise fail due to dependencies on other tables.

{{site.data.alerts.callout_info}}
The most common dependency issues are caused by unsatisfied foreign key relationships. You can avoid these issues by adding the `skip_foreign_keys` option to your `IMPORT` statement as needed. For more information, see the list of [import options](import.html#import-options).

For example, if you get the error message `pq: there is no unique constraint matching given keys for referenced table tablename`, use `IMPORT ... WITH skip_foreign_keys`.
{{site.data.alerts.end}}

Example usage:

{% include copy-clipboard.html %}
Expand Down
6 changes: 6 additions & 0 deletions v2.1/migrate-from-mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ The following options are available to `IMPORT ... MYSQLDUMP`:

By default, [`IMPORT ... MYSQLDUMP`][import] supports foreign keys. **Default: false**. Add the `skip_foreign_keys` option to speed up data import by ignoring foreign key constraints in the dump file's DDL. It will also enable you to import individual tables that would otherwise fail due to dependencies on other tables.

{{site.data.alerts.callout_info}}
The most common dependency issues are caused by unsatisfied foreign key relationships. You can avoid these issues by adding the `skip_foreign_keys` option to your `IMPORT` statement as needed. For more information, see the list of [import options](import.html#import-options).

For example, if you get the error message `pq: there is no unique constraint matching given keys for referenced table tablename`, use `IMPORT ... WITH skip_foreign_keys`.
{{site.data.alerts.end}}

Example usage:

{% include copy-clipboard.html %}
Expand Down
6 changes: 6 additions & 0 deletions v2.1/migrate-from-postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ Example usage:

By default, [`IMPORT ... PGDUMP`][import] supports foreign keys. **Default: false**. Add the `skip_foreign_keys` option to speed up data import by ignoring foreign key constraints in the dump file's DDL. It will also enable you to import individual tables that would otherwise fail due to dependencies on other tables.

{{site.data.alerts.callout_info}}
The most common dependency issues are caused by unsatisfied foreign key relationships. You can avoid these issues by adding the `skip_foreign_keys` option to your `IMPORT` statement as needed. For more information, see the list of [import options](import.html#import-options).

For example, if you get the error message `pq: there is no unique constraint matching given keys for referenced table tablename`, use `IMPORT ... WITH skip_foreign_keys`.
{{site.data.alerts.end}}

Example usage:

{% include copy-clipboard.html %}
Expand Down

0 comments on commit 6bba1a9

Please sign in to comment.