Skip to content

Latest commit

 

History

History
264 lines (192 loc) · 13.4 KB

create-changefeed.md

File metadata and controls

264 lines (192 loc) · 13.4 KB
title summary toc
CREATE CHANGEFEED
The CREATE CHANGEFEED statement creates a new changefeed, which provides row-level change subscriptions.
true

The CREATE CHANGEFEED statement creates a new changefeed, which provides row-level change subscriptions.

Changefeeds target a whitelist of tables, called the "watched rows." Every change to a watched row is emitted as a record in a configurable format (JSON) to a configurable sink (Kafka).

For more information, see Change Data Capture.

{{site.data.alerts.callout_info}} CREATE CHANGEFEED is an enterprise-only feature. For the core version, see EXPERIMENTAL CHANGEFEED FOR. {{site.data.alerts.end}}

Required privileges

Changefeeds can only be created by superusers, i.e., members of the admin role. The admin role exists by default with root as the member.

Synopsis

{% include {{ page.version.version }}/sql/diagrams/create_changefeed.html %}

Parameters

Parameter Description
table_name The name of the table (or tables in a comma separated list) to create a changefeed for.
sink The location of the configurable sink. The scheme of the URI indicates the type. For more information, see Sink URI below.
option / value For a list of available options and their values, see Options below.

Sink URI

The sink URI follows the basic format of:

'[scheme]://[host]:[port]?[query_parameters]'

The scheme can be kafka or any cloud storage sink.

Kafka

Example of a Kafka sink URI:

'kafka://broker.address.com:9092?topic_prefix=bar_&tls_enabled=true&ca_cert=LS0tLS1CRUdJTiBDRVJUSUZ&sasl_enabled=true&sasl_user=petee&sasl_password=bones'

Query parameters include:

Parameter | Value | Description ----------+-------+--------------- topic_prefix | STRING | Adds a prefix to all of the topic names.

For example, CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://...?topic_prefix=bar_' would emit rows under the topic bar_foo instead of foo. tls_enabled=true | BOOL | If true, enable Transport Layer Security (TLS) on the connection to Kafka. This can be used with a ca_cert (see below). ca_cert | STRING | The base64-encoded ca_cert file.

Note: To encode your ca.cert, run base64 -w 0 ca.cert. sasl_enabled | BOOL | If true, use SASL/PLAIN to authenticate. This requires a sasl_user and sasl_password (see below). sasl_user | STRING | Your SASL username. sasl_password | STRING | Your SASL password.

Cloud storage sink

Example of a cloud storage sink (i.e., AWS S3) URI:

'experimental-s3://test-s3encryption/test?AWS_ACCESS_KEY_ID=ABCDEFGHIJKLMNOPQ&AWS_SECRET_ACCESS_KEY=LS0tLS1CRUdJTiBDRVJUSUZ'

{{site.data.alerts.callout_info}} The scheme for a cloud storage sink should be prepended with experimental-. {{site.data.alerts.end}}

Any of the cloud storages below can be used as a sink:

{% include {{ page.version.version }}/misc/external-urls.md %}

Options

Option Value Description
updated N/A Include updated timestamps with each row.

If a cursor is provided, the "updated" timestamps will match the MVCC timestamps of the emitted rows, and there is no initial scan. If a cursor is not provided, the changefeed will perform an initial scan (as of the time the changefeed was created), and the "updated" timestamp for each change record emitted in the initial scan will be the timestamp of the initial scan. Similarly, when a backfill is performed for a schema change, the "updated" timestamp is set to the first timestamp for when the new schema is valid.
resolved INTERVAL Periodically emit resolved timestamps to the changefeed. Optionally, set a minimum duration between emitting resolved timestamps. If unspecified, all resolved timestamps are emitted.

Example: resolved='10s'
envelope key_only / wrapped Use key_only to emit only the key and no value, which is faster if you only want to know when the key changes.

Default: envelope=wrapped
cursor Timestamp Emits any changes after the given timestamp, but does not output the current state of the table first. If cursor is not specified, the changefeed starts by doing an initial scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.

When starting a changefeed at a specific cursor, the cursor cannot be before the configured garbage collection window (see gc.ttlseconds) for the table you're trying to follow; otherwise, the changefeed will error. By default, you cannot create a changefeed that starts more than 25 hours in the past.

cursor can be used to start a new changefeed where a previous changefeed ended.

Example: CURSOR=1536242855577149065.0000000000
format json / experimental_avro Format of the emitted record. Currently, support for Avro is limited and experimental. For mappings of CockroachDB types to Avro types, see the table below.

Default: format=json.
confluent_schema_registry Schema Registry address The Schema Registry address is required to use experimental_avro.

Avro limitations

Currently, support for Avro is limited and experimental. Below is a list of unsupported SQL types and values for Avro changefeeds:

  • Decimals must have precision specified.

  • Decimals with NaN or infinite values cannot be written in Avro.

    {{site.data.alerts.callout_info}} To avoid NaN or infinite values, add a CHECK constraint to prevent these values from being inserted into decimal columns. {{site.data.alerts.end}}

  • TIME, DATE, INTERVAL, UUID, INET, ARRAY, JSONB, BIT, and collated STRING are not supported in Avro yet.

Avro types

Below is a mapping of CockroachDB types to Avro types:

CockroachDB Type | Avro Type | Avro Logical Type -----------------+-----------+--------------------- INT | LONG | BOOL | BOOLEAN | FLOAT | DOUBLE | STRING | STRING | DATE | INT | DATE TIME | LONG | TIME-MICROS TIMESTAMP | LONG | TIME-MICROS TIMESTAMPTZ | LONG | TIME-MICROS DECIMAL | BYTES | DECIMAL UUID | STRING | INET | STRING | JSONB | STRING |

Responses

The messages (i.e., keys and values) emitted to a Kafka topic are specific to the envelope. The default format is wrapped, and the output messages are composed of the following:

  • Key: An array always composed of the row's PRIMARY KEY field(s) (e.g., [1] or {"id":1}).
  • Value:
    • One of three possible level fields:
      • after, which contains the state of the row after the update (or 'null' for DELETEs).
      • updated, which contains the updated timestamp.
      • resolved, which is emitted for records representing resolved timestamps. These records do not include an "after" value since they only function as checkpoints.
    • For INSERT and UPDATE, the current state of the row inserted or updated.
    • For DELETE, null.

For example:

Statement | Response -----------------------------------------------+----------------------------------------------------------------------- INSERT INTO office_dogs VALUES (1, 'Petee'); | JSON: [1] {"after": {"id": 1, "name": "Petee"}}
Avro: {"id":1} {"id":1,"name":{"string":"Petee"}} DELETE FROM office_dogs WHERE name = 'Petee' | JSON: [1] {"after": null}
Avro: {"id":1} {null}

Examples

Create a changefeed connected to Kafka

{% include copy-clipboard.html %}

> CREATE CHANGEFEED FOR TABLE name INTO 'kafka://host:port' WITH updated, resolved;
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)

For more information on how to create a changefeed connected to Kafka, see Change Data Capture.

Create a changefeed connected to Kafka using Avro

{% include copy-clipboard.html %}

> CREATE CHANGEFEED FOR TABLE name INTO 'kafka://host:port' WITH format = experimental_avro, confluent_schema_registry = <schema_registry_address>;
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)

For more information on how to create a changefeed that emits an Avro record, see Change Data Capture.

Create a changefeed connected to a cloud storage sink

{% include {{ page.version.version }}/misc/experimental-warning.md %}

{% include copy-clipboard.html %}

> CREATE CHANGEFEED FOR TABLE name INTO 'experimental-scheme://host?parameters' WITH updated, resolved;
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)

For more information on how to create a changefeed connected to a cloud storage sink, see Change Data Capture.

Manage a changefeed

Use the following SQL statements to pause, resume, and cancel a changefeed.

{{site.data.alerts.callout_info}} Changefeed-specific SQL statements (e.g., CANCEL CHANGEFEED) will be added in the future. {{site.data.alerts.end}}

Pause a changefeed

{% include copy-clipboard.html %}

> PAUSE JOB job_id;

For more information, see PAUSE JOB.

Resume a paused changefeed

{% include copy-clipboard.html %}

> RESUME JOB job_id;

For more information, see RESUME JOB.

Cancel a changefeed

{% include copy-clipboard.html %}

> CANCEL JOB job_id;

For more information, see CANCEL JOB.

Start a new changefeed where another ended

Find the high-water timestamp for the ended changefeed:

{% include copy-clipboard.html %}

> SELECT * FROM crdb_internal.jobs WHERE job_id = <job_id>;
        job_id       |  job_type  | ... |      high_water_timestamp      | error | coordinator_id
+--------------------+------------+ ... +--------------------------------+-------+----------------+
  383870400694353921 | CHANGEFEED | ... | 1537279405671006870.0000000000 |       |              1
(1 row)

Use the high_water_timestamp to start the new changefeed:

{% include copy-clipboard.html %}

> CREATE CHANGEFEED FOR TABLE name INTO 'kafka//host:port' WITH cursor = <high_water_timestamp>;

Note that because the cursor is provided, the initial scan is not performed.

See also