diff --git a/docs/RFCS/20201119_streaming_cluster_to_cluster.md b/docs/RFCS/20201119_streaming_cluster_to_cluster.md
index 069c12cc410f..a75c9a321bad 100644
--- a/docs/RFCS/20201119_streaming_cluster_to_cluster.md
+++ b/docs/RFCS/20201119_streaming_cluster_to_cluster.md
@@ -74,7 +74,7 @@ From a distance, this is similar to OS virtualization: the host or hypervisor ca
As discussed above, the tenant primitive encapsulates all potentially related data/state/etc in one well-defined prefix, and the ability to start and stop tenant processes provides the required "offline" destination keys-span in an otherwise "online" cluster.
-However enterprise customers with **non-tenant** deployments want to use cluster-to-cluster replication. While it is possible that they may someday migrate to run their workloads as tenants of multi-tenant clusters (or indeed we may opt to run _all clusters _as_ _"multi-tenant" clusters even if they just host one tenant), the multi-tenancy features are not yet ready for on-premise customer deployments, and are not likely to be in the immediate term. Meanwhile there is active demand for cluster-to-cluster replication from these customers _now._
+However enterprise customers with **non-tenant** deployments want to use cluster-to-cluster replication. While it is possible that they may someday migrate to run their workloads as tenants of multi-tenant clusters (or indeed we may opt to run _all_ clusters as "multi-tenant" clusters even if they just host one tenant), the multi-tenancy features are not yet ready for on-premise customer deployments, and are not likely to be in the immediate term. Meanwhile there is active demand for cluster-to-cluster replication from these customers _now._
Given that the host/tenant separation is what allowed side-stepping the online/offline contradiction, one potential solution for replicating non-tenant clusters is to invert the above online host cluster, offline guest tenant design. More specifically, by booting the processes for the second cluster in a special "recovery mode", where they read and write only within the span of a designated "recovery tenant" key prefix, then only that tenant is actually "online" with respect to SQL processes, including those running the replication job, while the rest of the keyspace of that cluster is effectively left offline, and can thus ingest replicated data.
@@ -87,8 +87,9 @@ While there will certainly be subtleties to resolve in this configuration that w
Replicating a tenant requires two main pieces:
-1. A stream of all KV changes made to the tenant span.
-2. A job ingesting that stream into a tenant.
+1. a stream of all KV changes made to the tenant span.
+2. a destination cluster job that ingests that stream into a tenant while tracking ingestion progress.
+3. a source cluster job that monitors the health of the stream and clean up resources after failure or completion.
These two pieces may run concurrently but physically separated e.g. to maintain a "hot standby" in a second datacenter, where the copy is ingesting changes as soon as they are emitted by source and ready to be brought online at a moment's notice. They could also instead be temporarily separated, i.e. using the persisted stream to replay later.
@@ -121,7 +122,13 @@ The resolved timestamp is an event which indicates that a stream has emitted all
Operators may wish to stream directly between clusters, either to reduce operational complexity and costs by eliminating the need to have an external storage intermediary or to minimize latency associated with buffering and flushing to storage.
-To stream directly between clusters, nodes in the producing cluster can allocate "outboxes" or fixed size buffers of emitted stream entries. It can then provide the addresses of these outboxes and expose them via an API so that the consuming cluster can dial them directly to fetch their contents. A consumer connected to a given outbox could hold its connection open and receive emitted events with minimal latency.
+To stream directly between clusters, an idea is to let nodes in the producing cluster to allocate "outboxes" or fixed size buffers of emitted stream entries. It can then provide the addresses of these outboxes and expose them via an API so that the consuming cluster can dial them directly to fetch their contents. A consumer connected to a given outbox could hold its connection open and receive emitted events with minimal latency.
+Consumers will trim these outboxes after consumption and the tail of the outbox serves as checkpoint. This has two major drawbacks.
+
+1. "outbox" table can have high write and read contention.
+2. it is not natural to keep checkpoints in the source cluster since only the consuming cluster can know where any given partition needs to resume from, since only it knows what it has flushed all the way to disk.
+
+Therefore, we can make source cluster "stateless": firstly generate a serialized physical plan that tells each consumer partition which source partitions it needs to talk to. Then the consuming cluster's partitions individually dial source cluster nodes and provide them with those serialized fragments of the larger flow plan, for them to each plan and run independently.
#### Streaming to/from Files
@@ -148,10 +155,11 @@ A cluster continuously ingesting a stream from files would need to poll to deter
The streaming client should be able to answer requests to:
-* Get the corresponding generation and its topology for a given timestamp
-* Start reading a generation’s partition at a given timestamp (i.e. consume the events of the stream)
-* Send a notification of a new generation, as well as the start timestamp of that generation
-* Drain all events from the partitions of a generation until a given timestamp has been resolved (used when a notification of a new generation has been received.)
+* Create: create a new stream instance which is a producer job in the source cluster that protects the tenant key span.
+* Plan: start a new generation and receive its partition topology for a given timestamp
+* Subscribe: start reading a generation’s partition at a given timestamp (i.e. consume the events of the stream)
+* CutOver: Drain all events from the partitions of a generation until a given timestamp has been resolved (used when a notification of a new generation has been received.)
+* Complete: finish the stream replication and clean up all resources.
The API should be transparent to whether the streaming is directly cluster to cluster, or facilitated by an intermediary buffer.
@@ -178,9 +186,9 @@ The event which designates the restoring cluster to stop listening to the incomi
#### Buffered Ingestion
-Buffered ingestion would write the incoming streams to a buffer, and wait until all partitions have received data at least up to a given source-cluster resolved logical timestamp, and only then** flush that prefix of their buffer for that resolved timestamp** to their actual storage.
+Buffered ingestion would write the incoming streams to a buffer, and wait until all partitions have received data at least up to a given source-cluster resolved logical timestamp, and only then **flush that prefix of their buffer for that resolved timestamp** to their actual storage.
-Given that a partition of the stream could fall _arbitrarily_ behind or another could burst much more data for a given time period, this implies this buffering must be prepared to hold an _unbounded _amount of incoming data before it is allowed to flush it, and thus likely will need to be disk-backed or at least able to spill to disk, potentially increasing write-amplification in the steady-state of tailing the stream.
+Given that a partition of the stream could fall _arbitrarily_ behind or another could burst much more data for a given time period, this implies this buffering must be prepared to hold an _unbounded_ amount of incoming data before it is allowed to flush it, and thus likely will need to be disk-backed or at least able to spill to disk, potentially increasing write-amplification in the steady-state of tailing the stream.
It is worth considering what the buffered ingestion implementation could look like in a bit more detail. One proposed approach would have each node maintain a Pebble instance with the WAL disabled (and generally otherwise optimized for a write workload). (timestamp, KV) pairs would be added to the store keyed on their timestamp. Processing the event stream would behave as follows:
@@ -258,7 +266,7 @@ It is important to provide metrics for monitoring the health of the replicating
- What range or ranges are the lagging ones that are holding it back? Is it ingestion delay or is the stream lagging?
- What is the size of the pending ingestion buffers?
- How far behind are the ingested timestamps vs the received timestamps?
-- What is the size of the unconsumed portion of the outboxes in the producing cluster i.e. how far behind is the stream consumer?
+- How far behind is the stream consumer?
- For file-based streams, what's the latest flushed consistent timestamp/how far behind is it?
- If/when we integrate with BACKUPs (e.g. restore+replay), what's the size of the stream since last backup?
diff --git a/pkg/ccl/changefeedccl/avro_test.go b/pkg/ccl/changefeedccl/avro_test.go
index 57d54fc30ff5..b7ba4e26301e 100644
--- a/pkg/ccl/changefeedccl/avro_test.go
+++ b/pkg/ccl/changefeedccl/avro_test.go
@@ -508,9 +508,10 @@ func TestAvroSchema(t *testing.T) {
// The avro golden strings are in the textual format defined in the spec.
t.Run("value_goldens", func(t *testing.T) {
goldens := []struct {
- sqlType string
- sql string
- avro string
+ sqlType string
+ sql string
+ avro string
+ numRawBytes int
}{
{sqlType: `INT`, sql: `NULL`, avro: `null`},
{sqlType: `INT`,
@@ -631,6 +632,26 @@ func TestAvroSchema(t *testing.T) {
{sqlType: `switch`, // User-defined enum with values "open", "closed"
sql: `'open'`,
avro: `{"string":"open"}`},
+
+ // The following test cases document the way goavro encodes and decodes
+ // the "bytes" type. We'll need to keep this behavior as the default to
+ // avoid any breaking changes.
+ {sqlType: `BYTES`,
+ sql: `b'\xff'`,
+ avro: `{"bytes":"\u00FF"}`,
+ numRawBytes: 1},
+ {sqlType: `BYTES`,
+ sql: `'a'`,
+ avro: `{"bytes":"a"}`,
+ numRawBytes: 1},
+ {sqlType: `BYTES`,
+ sql: `b'\001\002\003\004\005\006\007\010\011\012\013'`,
+ avro: `{"bytes":"\u0001\u0002\u0003\u0004\u0005\u0006\u0007\b\t\n\u000B"}`,
+ numRawBytes: 11},
+ {sqlType: `BYTES`,
+ sql: `''`,
+ avro: `{"bytes":""}`,
+ numRawBytes: 0},
}
for _, test := range goldens {
@@ -645,6 +666,12 @@ func TestAvroSchema(t *testing.T) {
row, avroSchemaNoSuffix, "")
require.NoError(t, err)
textual, err := schema.textualFromRow(row)
+ if test.numRawBytes > 0 {
+ overhead := 4
+ binary, err := schema.BinaryFromRow(make([]byte, 0, test.numRawBytes+20), row.ForEachColumn())
+ require.NoError(t, err)
+ require.Equal(t, test.numRawBytes, len(binary)-overhead)
+ }
require.NoError(t, err)
// Trim the outermost {}.
value := string(textual[1 : len(textual)-1])