From 9d2aaec461212a78868a7171eb07f96d29872f85 Mon Sep 17 00:00:00 2001 From: Lauren Date: Tue, 19 Feb 2019 15:56:24 -0500 Subject: [PATCH 01/13] CDC Updates Changes include: - Added / edited Avro core changefeed instructions Minor edit Add expected responses for enterprise changefeeds CDC updates - Fix broken links - Add info about cursor - Add info about updated timestamps - Add info about schema changes with backfill --- _includes/v19.1/cdc/core-url.md | 2 +- _includes/v19.1/sql/settings/settings.md | 2 +- v19.1/change-data-capture.md | 94 +++++++++++++++--------- v19.1/create-changefeed.md | 22 +++++- 4 files changed, 81 insertions(+), 39 deletions(-) diff --git a/_includes/v19.1/cdc/core-url.md b/_includes/v19.1/cdc/core-url.md index f104028918d..e1d039d886e 100644 --- a/_includes/v19.1/cdc/core-url.md +++ b/_includes/v19.1/cdc/core-url.md @@ -1,3 +1,3 @@ {{site.data.alerts.callout_info}} -Because core changefeeds return results differently than other SQL statements, they require a dedicated database connection with specific settings around result buffering. In normal operation, CockroachDB improves performance by buffering results server-side before returning them to a client. This can cause unbounded delivery latency for core changefeeds, so the `results_buffer_size` connection string parameter is used to disable buffering. Core changefeeds also have different cancellation behavior than other queries: they can only be canceled by closing the underlying connection or issuing a [`CANCEL QUERY`](cancel-query.html) statement on a separate connection. Combined, these attributes of changefeeds mean that applications should explicitly create dedicated connections to consume changefeed data, instead of using a connection pool as most client drivers do by default. +Because core changefeeds return results differently than other SQL statements, they require a dedicated database connection with specific settings around result buffering. In normal operation, CockroachDB improves performance by buffering results server-side before returning them to a client. Core changefeeds also have different cancellation behavior than other queries: they can only be canceled by closing the underlying connection or issuing a [`CANCEL QUERY`](cancel-query.html) statement on a separate connection. Combined, these attributes of changefeeds mean that applications should explicitly create dedicated connections to consume changefeed data, instead of using a connection pool as most client drivers do by default. {{site.data.alerts.end}} diff --git a/_includes/v19.1/sql/settings/settings.md b/_includes/v19.1/sql/settings/settings.md index 7dfae0dbece..6a22063fe5f 100644 --- a/_includes/v19.1/sql/settings/settings.md +++ b/_includes/v19.1/sql/settings/settings.md @@ -80,7 +80,7 @@ sql.defaults.distsqlenumeration1default distributed SQL execution mode [off = 0, auto = 1, on = 2] sql.defaults.experimental_vectorizeenumeration0default experimental_vectorize mode [off = 0, on = 1, always = 2] sql.defaults.optimizerenumeration1default cost-based optimizer mode [off = 0, on = 1, local = 2] -sql.defaults.results_buffer.sizebyte size16 KiBdefault size of the buffer that accumulates results for a statement or a batch of statements before they are sent to the client. This can be overridden on an individual connection with the 'results_buffer_size' parameter. Note that auto-retries generally only happen while no results have been delivered to the client, so reducing this size can increase the number of retriable errors a client receives. On the other hand, increasing the buffer size can increase the delay until the client receives the first result row. Updating the setting only affects new connections. Setting to 0 disables any buffering. +sql.defaults.results_buffer.sizebyte size16 KiBdefault size of the buffer that accumulates results for a statement or a batch of statements before they are sent to the client.s Note that auto-retries generally only happen while no results have been delivered to the client, so reducing this size can increase the number of retriable errors a client receives. On the other hand, increasing the buffer size can increase the delay until the client receives the first result row. Updating the setting only affects new connections. Setting to 0 disables any buffering. sql.defaults.serial_normalizationenumeration0default handling of SERIAL in table definitions [rowid = 0, virtual_sequence = 1, sql_sequence = 2] sql.distsql.distribute_index_joinsbooleantrueif set, for index joins we instantiate a join reader on every node that has a stream; if not set, we use a single join reader sql.distsql.flow_stream_timeoutduration10samount of time incoming streams wait for a flow to be set up before erroring out diff --git a/v19.1/change-data-capture.md b/v19.1/change-data-capture.md index feaad790fe9..8d6dcc97164 100644 --- a/v19.1/change-data-capture.md +++ b/v19.1/change-data-capture.md @@ -53,7 +53,7 @@ The core feature of CDC is the [changefeed](create-changefeed.html). Changefeeds - Rows are sharded between Kafka partitions by the row’s [primary key](primary-key.html). -- The `UPDATED` option adds an "updated" timestamp to each emitted row. You can also use the `RESOLVED` option to emit periodic "resolved" timestamp messages to each Kafka partition. A **resolved timestamp** is a guarantee that no (previously unseen) rows with a lower update timestamp will be emitted on that partition. +- The `UPDATED` option adds an "updated" timestamp to each emitted row. You can also use the `RESOLVED` option to emit periodic "resolved" timestamp messages to each Kafka partition. A "resolved" timestamp is a guarantee that no (previously unseen) rows with a lower update timestamp will be emitted on that partition. For example: @@ -77,7 +77,11 @@ The core feature of CDC is the [changefeed](create-changefeed.html). Changefeeds When schema changes with column backfill (e.g., adding a column with a default, adding a computed column, adding a `NOT NULL` column, dropping a column) are made to watched rows, the changefeed will emit some duplicates during the backfill. When it finishes, CockroachDB outputs all watched rows using the new schema. -For example, start with the changefeed created in the [example below](#create-a-changefeed-connected-to-kafka): +Rows that have been backfilled by a schema change are always re-emitted because Avro's default schema change functionality is not powerful enough to represent the schema changes that CockroachDB supports (e.g., CockroachDB columns can have default values that are arbitrary SQL expressions, but Avro only supports static default values). + +To ensure that the Avro schemas that CockroachDB publishes will work with the (undocumented and inconsistent) schema compatibility rules used by the Confluent schema registry, CockroachDB emits all fields in Avro as nullable unions. This ensures that Avro and Confluent consider the schemas to be both backward- and forward-compatible. Note that the original CockroachDB column definition is also included in the schema as a doc field, so it's still possible to distinguish between a `NOT NULL` CockroachDB column and a `NULL` CockroachDB column. + +For an example of a schema change with column backfill, start with the changefeed created in the [example below](#create-a-changefeed-connected-to-kafka): ~~~ shell [1] {"id": 1, "name": "Petee H"} @@ -122,7 +126,7 @@ The `kv.closed_timestamp.target_duration` [cluster setting](cluster-settings.htm ## Configure a changefeed (Core) -## Create +### Create New in v19.1: To create a core changefeed: @@ -222,7 +226,7 @@ You can use the high-water timestamp to [start a new changefeed where another en {% include copy-clipboard.html %} ~~~ shell - ./cockroach sql --url="postgresql://root@127.0.0.1:26257?sslmode=disable&results_buffer_size=0" --format=csv + oach sql --url="postgresql://root@127.0.0.1:26257?sslmode=disable" --format=csv ~~~ {% include {{ page.version.version }}/cdc/core-url.md %} @@ -265,7 +269,7 @@ You can use the high-water timestamp to [start a new changefeed where another en {% include copy-clipboard.html %} ~~~ shell - ./cockroach sql --insecure -e "INSERT INTO foo VALUES (1)" + $ cockroach sql --insecure -e "INSERT INTO foo VALUES (1)" ~~~ 8. Back in the terminal where the core changefeed is streaming, the following output has appeared: @@ -311,7 +315,7 @@ You can use the high-water timestamp to [start a new changefeed where another en {% include copy-clipboard.html %} ~~~ shell - ./cockroach sql --url="postgresql://root@127.0.0.1:26257?sslmode=disable&results_buffer_size=0" --format=csv + $ cockroach sql --url="postgresql://root@127.0.0.1:26257?sslmode=disable" --format=csv ~~~ {% include {{ page.version.version }}/cdc/core-url.md %} @@ -343,25 +347,24 @@ You can use the high-water timestamp to [start a new changefeed where another en ~~~ sql > EXPERIMENTAL CHANGEFEED FOR bar WITH format = experimental_avro, confluent_schema_registry = 'http://localhost:8081'; ~~~ - + bar,\000\000\000\000\001\002\000,\000\000\000\000\002\002\002\000 + ~~~ 9. In a new terminal, add another row: {% include copy-clipboard.html %} ~~~ shell - ./cockroach sql --insecure -e "INSERT INTO bar VALUES (1)" + $ cockroach sql --insecure -e "INSERT INTO bar VALUES (1)" ~~~ -10. Back in the terminal where the core changefeed is streaming, the output will appear - - + bar,\000\000\000\000\001\002\002,\000\000\000\000\002\002\002\002 + ~~~ Note that records may take a couple of seconds to display in the core changefeed. @@ -381,6 +384,13 @@ You can use the high-water timestamp to [start a new changefeed where another en $ ./bin/confluent stop ~~~ + To stop all Confluent processes, use: + + {% include copy-clipboard.html %} + ~~~ shell + $ ./bin/confluent destroy + ~~~ + ### Create a changefeed connected to Kafka {{site.data.alerts.callout_info}} @@ -444,21 +454,28 @@ In this example, you'll set up a changefeed for a single-node cluster that is co > SET CLUSTER SETTING enterprise.license = ''; ~~~ -8. Create a database called `cdc_demo`: +8. Enable the `kv.rangefeed.enabled` [cluster setting](cluster-settings.html): + + {% include copy-clipboard.html %} + ~~~ sql + > SET CLUSTER SETTING kv.rangefeed.enabled = true; + ~~~ + +9. Create a database called `cdc_demo`: {% include copy-clipboard.html %} ~~~ sql > CREATE DATABASE cdc_demo; ~~~ -9. Set the database as the default: +10. Set the database as the default: {% include copy-clipboard.html %} ~~~ sql > SET DATABASE = cdc_demo; ~~~ -10. Create a table and add data: +11. Create a table and add data: {% include copy-clipboard.html %} ~~~ sql @@ -479,7 +496,7 @@ In this example, you'll set up a changefeed for a single-node cluster that is co > UPDATE office_dogs SET name = 'Petee H' WHERE id = 1; ~~~ -11. Start the changefeed: +12. Start the changefeed: {% include copy-clipboard.html %} ~~~ sql @@ -495,7 +512,7 @@ In this example, you'll set up a changefeed for a single-node cluster that is co This will start up the changefeed in the background and return the `job_id`. The changefeed writes to Kafka. -12. In a new terminal, move into the extracted `confluent-` directory and start watching the Kafka topic: +13. In a new terminal, move into the extracted `confluent-` directory and start watching the Kafka topic: {% include copy-clipboard.html %} ~~~ shell @@ -513,29 +530,29 @@ In this example, you'll set up a changefeed for a single-node cluster that is co Note that the initial scan displays the state of the table as of when the changefeed started (therefore, the initial value of `"Petee"` is omitted). -13. Back in the SQL client, insert more data: +14. Back in the SQL client, insert more data: {% include copy-clipboard.html %} ~~~ sql > INSERT INTO office_dogs VALUES (3, 'Ernie'); ~~~ -14. Back in the terminal where you're watching the Kafka topic, the following output has appeared: +15. Back in the terminal where you're watching the Kafka topic, the following output has appeared: ~~~ shell [3] {"id": 3, "name": "Ernie"} ~~~ -15. When you are done, exit the SQL shell (`\q`). +16. When you are done, exit the SQL shell (`\q`). -16. To stop `cockroach`, run: +17. To stop `cockroach`, run: {% include copy-clipboard.html %} ~~~ shell $ cockroach quit --insecure ~~~ -17. To stop Kafka, move into the extracted `confluent-` directory and stop Confluent: +18. To stop Kafka, move into the extracted `confluent-` directory and stop Confluent: {% include copy-clipboard.html %} ~~~ shell @@ -605,21 +622,28 @@ In this example, you'll set up a changefeed for a single-node cluster that is co > SET CLUSTER SETTING enterprise.license = ''; ~~~ -8. Create a database called `cdc_demo`: +8. Enable the `kv.rangefeed.enabled` [cluster setting](cluster-settings.html): + + {% include copy-clipboard.html %} + ~~~ sql + > SET CLUSTER SETTING kv.rangefeed.enabled = true; + ~~~ + +9. Create a database called `cdc_demo`: {% include copy-clipboard.html %} ~~~ sql > CREATE DATABASE cdc_demo; ~~~ -9. Set the database as the default: +10. Set the database as the default: {% include copy-clipboard.html %} ~~~ sql > SET DATABASE = cdc_demo; ~~~ -10. Create a table and add data: +11. Create a table and add data: {% include copy-clipboard.html %} ~~~ sql @@ -640,7 +664,7 @@ In this example, you'll set up a changefeed for a single-node cluster that is co > UPDATE office_dogs SET name = 'Petee H' WHERE id = 1; ~~~ -11. Start the changefeed: +12. Start the changefeed: {% include copy-clipboard.html %} ~~~ sql @@ -656,7 +680,7 @@ In this example, you'll set up a changefeed for a single-node cluster that is co This will start up the changefeed in the background and return the `job_id`. The changefeed writes to Kafka. -12. In a new terminal, move into the extracted `confluent-` directory and start watching the Kafka topic: +13. In a new terminal, move into the extracted `confluent-` directory and start watching the Kafka topic: {% include copy-clipboard.html %} ~~~ shell @@ -674,29 +698,29 @@ In this example, you'll set up a changefeed for a single-node cluster that is co Note that the initial scan displays the state of the table as of when the changefeed started (therefore, the initial value of `"Petee"` is omitted). -13. Back in the SQL client, insert more data: +14. Back in the SQL client, insert more data: {% include copy-clipboard.html %} ~~~ sql > INSERT INTO office_dogs VALUES (3, 'Ernie'); ~~~ -14. Back in the terminal where you're watching the Kafka topic, the following output has appeared: +15. Back in the terminal where you're watching the Kafka topic, the following output has appeared: ~~~ shell {"id":3} {"id":3,"name":{"string":"Ernie"}} ~~~ -15. When you are done, exit the SQL shell (`\q`). +16. When you are done, exit the SQL shell (`\q`). -16. To stop `cockroach`, run: +17. To stop `cockroach`, run: {% include copy-clipboard.html %} ~~~ shell $ cockroach quit --insecure ~~~ -17. To stop Kafka, move into the extracted `confluent-` directory and stop Confluent: +18. To stop Kafka, move into the extracted `confluent-` directory and stop Confluent: {% include copy-clipboard.html %} ~~~ shell diff --git a/v19.1/create-changefeed.md b/v19.1/create-changefeed.md index dedb7406c10..cc6f5e8c8bf 100644 --- a/v19.1/create-changefeed.md +++ b/v19.1/create-changefeed.md @@ -40,10 +40,10 @@ Parameter | Description Option | Value | Description -------|-------|------------ -`updated` | N/A | Include updated timestamps with each row. +`updated` | N/A | Include updated timestamps with each row.

If a `cursor` is provided, the "updated" timestamps will match the [MVCC](../v19.1/architecture/storage-layer.html#mvcc) timestamps of the emitted rows, and there is no initial scan.. If a `cursor` is not provided, the changefeed will perform an initial scan (as of the time the changefeed was created), and the "updated" timestamp for each change record emitted in the initial scan will be the timestamp of the initial scan. Similarly, when a [backfill is performed for a schema change](change-data-capture.html#schema-changes-with-column-backfill), the "updated" timestamp is set to the first timestamp for when the new schema is valid. `resolved` | [`INTERVAL`](interval.html) | Periodically emit resolved timestamps to the changefeed. Optionally, set a minimum duration between emitting resolved timestamps. If unspecified, all resolved timestamps are emitted.

Example: `resolved='10s'` `envelope` | `key_only` / `wrapped` | Use `key_only` to emit only the key and no value, which is faster if you only want to know when the key changes.

Default: `envelope=wrapped` -`cursor` | [Timestamp](as-of-system-time.html#parameters) | Emits any changes after the given timestamp, but does not output the current state of the table first. If `cursor` is not specified, the changefeed starts by doing a consistent scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.

`cursor` can be used to [start a new changefeed where a previous changefeed ended.](#start-a-new-changefeed-where-another-ended)

Example: `CURSOR=1536242855577149065.0000000000` +`cursor` | [Timestamp](as-of-system-time.html#parameters) | Emits any changes after the given timestamp, but does not output the current state of the table first. If `cursor` is not specified, the changefeed starts by doing an initial scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.

When starting a changefeed at a specific `cursor`, the `cursor` cannot be before the configured garbage collection window (see [`gc.ttlseconds`](configure-replication-zones.html#replication-zone-variables)) for the table you're trying to follow; otherwise, the changefeed will error. By default, you cannot create a changefeed that starts more than 25 hours in the past.

`cursor` can be used to [start a new changefeed where a previous changefeed ended.](#start-a-new-changefeed-where-another-ended)

Example: `CURSOR=1536242855577149065.0000000000` `format` | `json` / `experimental_avro` | Format of the emitted record. Currently, support for [Avro is limited and experimental](#avro-limitations).

Default: `format=json`. `confluent_schema_registry` | Schema Registry address | The [Schema Registry](https://docs.confluent.io/current/schema-registry/docs/index.html#sr) address is required to use `experimental_avro`. @@ -60,6 +60,22 @@ Currently, support for Avro is limited and experimental. Below is a list of unsu - [`TIME`, `DATE`, `INTERVAL`](https://github.com/cockroachdb/cockroach/issues/32472), [`UUID`, `INET`](https://github.com/cockroachdb/cockroach/issues/34417), [`ARRAY`](https://github.com/cockroachdb/cockroach/issues/34420), [`JSONB`](https://github.com/cockroachdb/cockroach/issues/34421), `BIT`, and collated `STRING` are not supported in Avro yet. +## Responses + +The messages (i.e., keys and values) emitted to a Kafka topic are composed of the following: + +- **Key**: Always composed of the table's `PRIMARY KEY` field (e.g., `[1]` or `{"id":1}`). +- **Value**: + - For [`INSERT`](insert.html) and [`UPDATE`](update.html), the current state of the row inserted or updated. + - For [`DELETE`](delete.html), `null`. + +For example: + +Statement | Response +-----------------------------------------------+----------------------------------------------------------------------- +`INSERT INTO office_dogs VALUES (1, 'Petee');` | JSON: `[1] {"after": {"id": 1, "name": "Petee"}}`
Avro: `{"id":1} {"id":1,"name":{"string":"Petee"}}` +`DELETE FROM office_dogs WHERE name = 'Petee'` | JSON: `[1] {"after": null}`
Avro: `{"id":1} {null}` + ## Examples ### Create a changefeed @@ -153,6 +169,8 @@ Use the `high_water_timestamp` to start the new changefeed: > CREATE CHANGEFEED FOR TABLE name INTO 'kafka//host:port' WITH cursor = ; ~~~ +Note that because the cursor is provided, the initial scan is not performed. + ## See also - [Change Data Capture](change-data-capture.html) From 6fc28e8c6fee7c1ab5a644fbc38ea2501f72469a Mon Sep 17 00:00:00 2001 From: Lauren Date: Wed, 27 Mar 2019 12:39:33 -0400 Subject: [PATCH 02/13] Add table that maps CRDB types to Avro types --- v19.1/create-changefeed.md | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/v19.1/create-changefeed.md b/v19.1/create-changefeed.md index cc6f5e8c8bf..1b67fe35666 100644 --- a/v19.1/create-changefeed.md +++ b/v19.1/create-changefeed.md @@ -44,7 +44,7 @@ Option | Value | Description `resolved` | [`INTERVAL`](interval.html) | Periodically emit resolved timestamps to the changefeed. Optionally, set a minimum duration between emitting resolved timestamps. If unspecified, all resolved timestamps are emitted.

Example: `resolved='10s'` `envelope` | `key_only` / `wrapped` | Use `key_only` to emit only the key and no value, which is faster if you only want to know when the key changes.

Default: `envelope=wrapped` `cursor` | [Timestamp](as-of-system-time.html#parameters) | Emits any changes after the given timestamp, but does not output the current state of the table first. If `cursor` is not specified, the changefeed starts by doing an initial scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.

When starting a changefeed at a specific `cursor`, the `cursor` cannot be before the configured garbage collection window (see [`gc.ttlseconds`](configure-replication-zones.html#replication-zone-variables)) for the table you're trying to follow; otherwise, the changefeed will error. By default, you cannot create a changefeed that starts more than 25 hours in the past.

`cursor` can be used to [start a new changefeed where a previous changefeed ended.](#start-a-new-changefeed-where-another-ended)

Example: `CURSOR=1536242855577149065.0000000000` -`format` | `json` / `experimental_avro` | Format of the emitted record. Currently, support for [Avro is limited and experimental](#avro-limitations).

Default: `format=json`. +`format` | `json` / `experimental_avro` | Format of the emitted record. Currently, support for [Avro is limited and experimental](#avro-limitations). For mappings of CockroachDB types to Avro types, [see the table below](#avro-types).

Default: `format=json`. `confluent_schema_registry` | Schema Registry address | The [Schema Registry](https://docs.confluent.io/current/schema-registry/docs/index.html#sr) address is required to use `experimental_avro`. #### Avro limitations @@ -60,6 +60,25 @@ Currently, support for Avro is limited and experimental. Below is a list of unsu - [`TIME`, `DATE`, `INTERVAL`](https://github.com/cockroachdb/cockroach/issues/32472), [`UUID`, `INET`](https://github.com/cockroachdb/cockroach/issues/34417), [`ARRAY`](https://github.com/cockroachdb/cockroach/issues/34420), [`JSONB`](https://github.com/cockroachdb/cockroach/issues/34421), `BIT`, and collated `STRING` are not supported in Avro yet. +#### Avro types + +Below is a mapping of CockroachDB types to Avro types: + +CockroachDB Type | Avro Type | Avro Logical Type +-----------------+-----------+--------------------- +[`INT`](int.html) | [`LONG`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) | +[`BOOL`](bool.html) | [`BOOLEAN`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) | +[`FLOAT`](float.html) | [`DOUBLE`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) | +[`STRING`](string.html) | [`STRING`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) | +[`DATE`](date.html) | [`INT`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) | [`DATE`](https://avro.apache.org/docs/1.8.1/spec.html#Date) +[`TIME`](time.html) | [`LONG`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) | [`TIME-MICROS`](https://avro.apache.org/docs/1.8.1/spec.html#Time+%28microsecond+precision%29) +[`TIMESTAMP`](timestamp.html) | [`LONG`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) | [`TIME-MICROS`](https://avro.apache.org/docs/1.8.1/spec.html#Time+%28microsecond+precision%29) +[`TIMESTAMPTZ`](timestamp.html) | [`LONG`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) | [`TIME-MICROS`](https://avro.apache.org/docs/1.8.1/spec.html#Time+%28microsecond+precision%29) +[`DECIMAL`](decimal.html) | [`BYTES`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) | [`DECIMAL`](https://avro.apache.org/docs/1.8.1/spec.html#Decimal) +[`UUID`](uuid.html) | [`STRING`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) | +[`INET`](inet.html) | [`STRING`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) | +[`JSONB`](jsonb.html) | [`STRING`](https://avro.apache.org/docs/1.8.1/spec.html#schema_primitive) | + ## Responses The messages (i.e., keys and values) emitted to a Kafka topic are composed of the following: From 8627b7d99780a8d783fcf428bff48d68b54813d7 Mon Sep 17 00:00:00 2001 From: Lauren Date: Thu, 28 Mar 2019 11:43:38 -0400 Subject: [PATCH 03/13] Add debug info. Add Kafka query parameters --- v19.1/change-data-capture.md | 16 ++++++++++++++++ v19.1/create-changefeed.md | 28 ++++++++++++++++++++++++++-- 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/v19.1/change-data-capture.md b/v19.1/change-data-capture.md index 8d6dcc97164..280e535a6b2 100644 --- a/v19.1/change-data-capture.md +++ b/v19.1/change-data-capture.md @@ -209,6 +209,22 @@ Changefeed progress is exposed as a high-water timestamp that advances as the ch You can use the high-water timestamp to [start a new changefeed where another ended](create-changefeed.html#start-a-new-changefeed-where-another-ended). {{site.data.alerts.end}} +### Debug a changefeed + +{{site.data.alerts.callout_info}} +Debugging is only available for enterprise changefeeds. +{{site.data.alerts.end}} + +For changefeeds connected to Kafka, use log information to debug connection issues (i.e., `kafka: client has run out of available brokers to talk to (Is your cluster reachable?)`). Debug by looking for lines in the logs with `[kafka-producer]` in them: + +~~~ +I190312 18:56:53.535646 585 vendor/github.com/Shopify/sarama/client.go:123 [kafka-producer] Initializing new client +I190312 18:56:53.535714 585 vendor/github.com/Shopify/sarama/client.go:724 [kafka-producer] client/metadata fetching metadata for all topics from broker localhost:9092 +I190312 18:56:53.536730 569 vendor/github.com/Shopify/sarama/broker.go:148 [kafka-producer] Connected to broker at localhost:9092 (unregistered) +I190312 18:56:53.537661 585 vendor/github.com/Shopify/sarama/client.go:500 [kafka-producer] client/brokers registered new broker #0 at 172.16.94.87:9092 +I190312 18:56:53.537686 585 vendor/github.com/Shopify/sarama/client.go:170 [kafka-producer] Successfully initialized new client +~~~ + ## Usage examples ### Create a core changefeed diff --git a/v19.1/create-changefeed.md b/v19.1/create-changefeed.md index 1b67fe35666..b0de3487375 100644 --- a/v19.1/create-changefeed.md +++ b/v19.1/create-changefeed.md @@ -29,13 +29,37 @@ Changefeeds can only be created by superusers, i.e., [members of the `admin` rol Parameter | Description ----------|------------ `table_name` | The name of the table (or tables in a comma separated list) to create a changefeed for. -`sink` | The location of the configurable sink. The scheme of the URI indicates the type; currently, only `kafka`. There are query parameters that vary per type. Currently, the `kafka` scheme only has `topic_prefix`, which adds a prefix to all of the topic names.

Sink URI scheme: `'[scheme]://[host]:[port][?topic_prefix=[foo]]'`

For example, `CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://...?topic_prefix=bar_'` would emit rows under the topic `bar_foo` instead of `foo`. +`sink` | The location of the configurable sink. The scheme of the URI indicates the type; currently, only `kafka`. For more information, see [Kafka query parameters](#kafka-query-parameters) below. `option` / `value` | For a list of available options and their values, see [Options](#options) below. - +### Kafka query parameters + +The sink URI scheme for Kafka follows the basic format of: + +~~~ +'kafka://[host]:[port][?query_parameters]' +~~~ + +For example: + +~~~ +'kafka://broker.address.com:9092?tls_enabled=true&ca_cert=LS0tLS1CRUdJTiBDRVJUSUZ&sasl_enabled=true&sasl_user=petee&sasl_password=bones' +~~~ + +Query parameters include: + +Parameter | Value | Description +----------+-------+--------------- +`topic_prefix` | [`STRING`](string.html) | Adds a prefix to all of the topic names.

For example, `CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://...?topic_prefix=bar_'` would emit rows under the topic `bar_foo` instead of `foo`. +`tls_enabled=true` | [`BOOL`](bool.html) | If `true`, use a Transport Layer Security (TLS) connection. This can be used with a `ca_cert` (see below). +`ca_cert` | [`STRING`](string.html) | The base64-encoded `ca_cert` file.

Note: To encode your `ca.cert`, run `base64 -w 0 ca.cert`. +`sasl_enabled` | [`BOOL`](bool.html) | If `true`, use Simple Authentication and Security Layer (SASL) to authenticate. This requires a `sasl_user` and `sasl_password` (see below). +`sasl_user` | [`STRING`](string.html) | Your SASL username. +`sasl_password` | [`STRING`](string.html) | Your SASL password. + ### Options Option | Value | Description From f7c4ec0530ee2fe2874c2e4a75abdad841aa1d70 Mon Sep 17 00:00:00 2001 From: Lauren Date: Thu, 28 Mar 2019 13:33:10 -0400 Subject: [PATCH 04/13] Add cloud storage sink --- v19.1/change-data-capture.md | 107 +++++++++++++++++++++++++++++++++++ v19.1/create-changefeed.md | 32 +++++++++-- 2 files changed, 133 insertions(+), 6 deletions(-) diff --git a/v19.1/change-data-capture.md b/v19.1/change-data-capture.md index 280e535a6b2..bc58bdb49f5 100644 --- a/v19.1/change-data-capture.md +++ b/v19.1/change-data-capture.md @@ -743,6 +743,113 @@ In this example, you'll set up a changefeed for a single-node cluster that is co $ ./bin/confluent stop ~~~ +### Create a changefeed connected to a cloud storage sink + +{{site.data.alerts.callout_info}} +`CREATE CHANGEFEED` is an [enterprise-only](enterprise-licensing.html) feature. For the core version, see [the `CHANGEFEED FOR` example above](#create-a-core-changefeed). +{{site.data.alerts.end}} + +{% include {{ page.version.version }}/misc/experimental-warning.md %} + +New in v19.1: In this example, you'll set up a changefeed for a single-node cluster that is connected to an AWS sink. Note that you can set up changefeeds for any of [these cloud storages](create-changefeed.html#cloud-storage-sink). + +1. If you do not already have one, [request a trial enterprise license](enterprise-licensing.html). + +2. In a terminal window, start `cockroach`: + + {% include copy-clipboard.html %} + ~~~ shell + $ cockroach start --insecure --listen-addr=localhost --background + ~~~ + +3. As the `root` user, open the [built-in SQL client](use-the-built-in-sql-client.html): + + {% include copy-clipboard.html %} + ~~~ shell + $ cockroach sql --insecure + ~~~ + +4. Set your organization name and [enterprise license](enterprise-licensing.html) key that you received via email: + + {% include copy-clipboard.html %} + ~~~ shell + > SET CLUSTER SETTING cluster.organization = ''; + ~~~ + + {% include copy-clipboard.html %} + ~~~ shell + > SET CLUSTER SETTING enterprise.license = ''; + ~~~ + +5. Enable the `kv.rangefeed.enabled` [cluster setting](cluster-settings.html): + + {% include copy-clipboard.html %} + ~~~ sql + > SET CLUSTER SETTING kv.rangefeed.enabled = true; + ~~~ + +6. Create a database called `cdc_demo`: + + {% include copy-clipboard.html %} + ~~~ sql + > CREATE DATABASE cdc_demo; + ~~~ + +7. Set the database as the default: + + {% include copy-clipboard.html %} + ~~~ sql + > SET DATABASE = cdc_demo; + ~~~ + +8. Create a table and add data: + + {% include copy-clipboard.html %} + ~~~ sql + > CREATE TABLE office_dogs ( + id INT PRIMARY KEY, + name STRING); + ~~~ + + {% include copy-clipboard.html %} + ~~~ sql + > INSERT INTO office_dogs VALUES + (1, 'Petee'), + (2, 'Carl'); + ~~~ + + {% include copy-clipboard.html %} + ~~~ sql + > UPDATE office_dogs SET name = 'Petee H' WHERE id = 1; + ~~~ + +9. Start the changefeed: + + {% include copy-clipboard.html %} + ~~~ sql + > CREATE CHANGEFEED FOR TABLE office_dogs INTO 'experimental-s3://test-s3encryption/test?AWS_ACCESS_KEY_ID=ABCDEFGHIJKLMNOPQ&AWS_SECRET_ACCESS_KEY=LS0tLS1CRUdJTiBDRVJUSUZ' with updated, resolved='10s'; + ~~~ + + ~~~ + job_id + +--------------------+ + 360645287206223873 + (1 row) + ~~~ + + This will start up the changefeed in the background and return the `job_id`. The changefeed writes to AWS. + +10. Monitor your changefeed on the Admin UI (http://localhost:8080/#/metrics/changefeeds/cluster). For more information, see Changefeeds Dashboard. + +11. When you are done, exit the SQL shell (`\q`). + +12. To stop `cockroach`, run: + + {% include copy-clipboard.html %} + ~~~ shell + $ cockroach quit --insecure + ~~~ + ## Known limitations {% include {{ page.version.version }}/known-limitations/cdc.md %} diff --git a/v19.1/create-changefeed.md b/v19.1/create-changefeed.md index b0de3487375..a16d1f0d6dd 100644 --- a/v19.1/create-changefeed.md +++ b/v19.1/create-changefeed.md @@ -29,24 +29,28 @@ Changefeeds can only be created by superusers, i.e., [members of the `admin` rol Parameter | Description ----------|------------ `table_name` | The name of the table (or tables in a comma separated list) to create a changefeed for. -`sink` | The location of the configurable sink. The scheme of the URI indicates the type; currently, only `kafka`. For more information, see [Kafka query parameters](#kafka-query-parameters) below. +`sink` | The location of the configurable sink. The scheme of the URI indicates the type. For more information, see [Sink URI](#sink-uri) below. `option` / `value` | For a list of available options and their values, see [Options](#options) below. -### Kafka query parameters +### Sink URI -The sink URI scheme for Kafka follows the basic format of: +The sink URI follows the basic format of: ~~~ -'kafka://[host]:[port][?query_parameters]' +'[scheme]://[host]:[port]?[query_parameters]' ~~~ -For example: +The `scheme` can be [`kafka`](#kafka) or any [cloud storage sink](#cloud-storage-sink). + +#### Kafka + +Example of a Kafka sink URI: ~~~ -'kafka://broker.address.com:9092?tls_enabled=true&ca_cert=LS0tLS1CRUdJTiBDRVJUSUZ&sasl_enabled=true&sasl_user=petee&sasl_password=bones' +'kafka://broker.address.com:9092?topic_prefix=bar_&tls_enabled=true&ca_cert=LS0tLS1CRUdJTiBDRVJUSUZ&sasl_enabled=true&sasl_user=petee&sasl_password=bones' ~~~ Query parameters include: @@ -60,6 +64,22 @@ Parameter | Value | Description `sasl_user` | [`STRING`](string.html) | Your SASL username. `sasl_password` | [`STRING`](string.html) | Your SASL password. +#### Cloud storage sink + +Example of a cloud storage sink (i.e., AWS) URI: + +~~~ +'experimental-s3://test-s3encryption/test?AWS_ACCESS_KEY_ID=ABCDEFGHIJKLMNOPQ&AWS_SECRET_ACCESS_KEY=LS0tLS1CRUdJTiBDRVJUSUZ' +~~~ + +{{site.data.alerts.callout_info}} +The `scheme` for a cloud storage sink should be prepended with `experimental-`. +{{site.data.alerts.end}} + +Any of the cloud storages below can be used as a sink: + +{% include {{ page.version.version }}/misc/external-urls.md %} + ### Options Option | Value | Description From dde2a95524799f6ac3cab6382969b548ed3e5153 Mon Sep 17 00:00:00 2001 From: Lauren Date: Thu, 28 Mar 2019 14:21:26 -0400 Subject: [PATCH 05/13] Add new S3 parameter (#4196) Minor edits / links --- _includes/v19.1/misc/external-urls.md | 4 ++-- v19.1/change-data-capture.md | 14 ++++++-------- v19.1/changefeed-for.md | 2 +- v19.1/create-changefeed.md | 25 ++++++++++++++++++++++--- 4 files changed, 31 insertions(+), 14 deletions(-) diff --git a/_includes/v19.1/misc/external-urls.md b/_includes/v19.1/misc/external-urls.md index cceca22284a..62d2a2591b1 100644 --- a/_includes/v19.1/misc/external-urls.md +++ b/_includes/v19.1/misc/external-urls.md @@ -4,12 +4,12 @@ | Location | Scheme | Host | Parameters | |-------------------------------------------------------------+-------------+--------------------------------------------------+----------------------------------------------------------------------------| -| Amazon S3 | `s3` | Bucket name | `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY` | +| Amazon S3 | `s3` | Bucket name | `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_SESSION_TOKEN ` | | Azure | `azure` | N/A (see [Example file URLs](#example-file-urls) | `AZURE_ACCOUNT_KEY`, `AZURE_ACCOUNT_NAME` | | Google Cloud [1](#considerations) | `gs` | Bucket name | `AUTH` (optional): can be `default` or `implicit` | | HTTP [2](#considerations) | `http` | Remote host | N/A | | NFS/Local [3](#considerations) | `nodelocal` | N/A (see [Example file URLs](#example-file-urls) | N/A | -| S3-compatible services [4](#considerations) | `s3` | Bucket name | `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_REGION` [5](#considerations) (optional), `AWS_ENDPOINT` | +| S3-compatible services [4](#considerations) | `s3` | Bucket name | `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_SESSION_TOKEN, `AWS_REGION` [5](#considerations) (optional), `AWS_ENDPOINT` | {{site.data.alerts.callout_info}} The location parameters often contain special characters that need to be URI-encoded. Use Javascript's [encodeURIComponent](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/encodeURIComponent) function or Go language's [url.QueryEscape](https://golang.org/pkg/net/url/#QueryEscape) function to URI-encode the parameters. Other languages provide similar functions to URI-encode special characters. diff --git a/v19.1/change-data-capture.md b/v19.1/change-data-capture.md index bc58bdb49f5..c682b6754b9 100644 --- a/v19.1/change-data-capture.md +++ b/v19.1/change-data-capture.md @@ -124,9 +124,7 @@ To enable rangefeeds for an existing changefeed, you must also restart the chang The `kv.closed_timestamp.target_duration` [cluster setting](cluster-settings.html) can be used with push changefeeds. Resolved timestamps will always be behind by at least this setting's duration; however, decreasing the duration leads to more transaction restarts in your cluster, which can affect performance. -## Configure a changefeed (Core) - -### Create +## Create a changefeed (Core) New in v19.1: To create a core changefeed: @@ -145,7 +143,7 @@ To create a changefeed: {% include copy-clipboard.html %} ~~~ sql -> CREATE CHANGEFEED FOR TABLE name INTO 'kafka://host:port'; +> CREATE CHANGEFEED FOR TABLE name INTO 'schema://host:port'; ~~~ For more information, see [`CREATE CHANGEFEED`](create-changefeed.html). @@ -305,7 +303,7 @@ I190312 18:56:53.537686 585 vendor/github.com/Shopify/sarama/client.go:170 [kaf $ cockroach quit --insecure ~~~ -### Create a core changefeed with Avro +### Create a core changefeed in Avro New in v19.1: In this example, you'll set up a core changefeed for a single-node cluster that emits [Avro](https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format) records. @@ -575,10 +573,10 @@ In this example, you'll set up a changefeed for a single-node cluster that is co $ ./bin/confluent stop ~~~ -### Create a changefeed in Avro connected to Kafka +### Create a changefeed connected to Kafka in Avro {{site.data.alerts.callout_info}} -`CREATE CHANGEFEED` is an [enterprise-only](enterprise-licensing.html) feature. For the core version, see [the `CHANGEFEED FOR` example above](#create-a-core-changefeed-with-avro). +`CREATE CHANGEFEED` is an [enterprise-only](enterprise-licensing.html) feature. For the core version, see [the `CHANGEFEED FOR` example above](#create-a-core-changefeed-in-avro). {{site.data.alerts.end}} In this example, you'll set up a changefeed for a single-node cluster that is connected to a Kafka sink and emits [Avro](https://avro.apache.org/docs/1.8.2/spec.html) records. @@ -839,7 +837,7 @@ In this example, you'll set up a changefeed for a single-node cluster that is co This will start up the changefeed in the background and return the `job_id`. The changefeed writes to AWS. -10. Monitor your changefeed on the Admin UI (http://localhost:8080/#/metrics/changefeeds/cluster). For more information, see Changefeeds Dashboard. +10. Monitor your changefeed on the Admin UI (http://localhost:8080/#/metrics/changefeeds/cluster). For more information, see [Changefeeds Dashboard](admin-ui-cdc-dashboard.html). 11. When you are done, exit the SQL shell (`\q`). diff --git a/v19.1/changefeed-for.md b/v19.1/changefeed-for.md index f6bc6e4db0e..fc736a534f6 100644 --- a/v19.1/changefeed-for.md +++ b/v19.1/changefeed-for.md @@ -99,7 +99,7 @@ foo,\000\000\000\000\001\002\024,\000\000\000\000\002\002\002\024 To stop streaming the changefeed, enter **CTRL+C** into the terminal where the changefeed is running. -For more information on how to create a core changefeed, see [Change Data Capture](change-data-capture.html#create-a-core-changefeed-with-avro). +For more information on how to create a core changefeed, see [Change Data Capture](change-data-capture.html#create-a-core-changefeed-in-avro).