Replies: 2 comments 2 replies
-
I did a little research on CDC and wanted to update. Here's a brief subjective reaction:
The other aspect of CDC into Flow is deciding what we do with that data. Basically, we get a |
Beta Was this translation helpful? Give feedback.
-
Here's an attempt at an interface definition for captures drivers. It's roughly in protobuf syntax, but I We start by creating our n captures shards, which each have an assigned range of a hashed For each stream in the For the state store, I think using rocksdb backed by a recovery log would definitely work, but it message Capture {
// Endpoint which this request is addressing.
string endpoint_name = 1;
// Endpoint type addressed by this request.
flow.EndpointType endpoint_type = 2;
// Driver-specific configuration, as an encoded JSON object.
string endpoint_config_json = 3;
// Collection to be materialized.
flow.CollectionSpec collection = 4;
// Projection configuration, keyed by the projection field name,
// with JSON-encoded and driver-defined configuration objects.
map<string, string> field_config_json = 5;
}
message ValidateRequest {
Capture capture = 1;
}
message ValidateResponse {
repeated string resource_path = 1;
// Optional hints that provide information on the appropriate number of reads from the remote
// system to exectute in parallel. Flowctl will use these numbers in determining the appropriate
// number of captures shards to create, and/or to sanity check a user-provided value. For kafka,
// these both correspond to the number of partitions in the topic. For S3, we might use some
// heuristic based on the approximate volume of data that's in the bucket, or we could just
// hard-code some realistic upper limit.
uint32 min_parallelism_hint = 2;
uint32 max_parallelism_hint = 2;
}
// Represents a distinct source of data in a remote system, which will be consumed in order.
// For example, in Kafka each topic partition would be a Stream, since it must be consumed in order.
// In S3, each distinct object may be considered a separate stream.
message Stream {
// The canonical id of this stream.
// For Kafka, the identity of a Stream is a tuple of topic and partition. For S3, it's the object key.
string id = 1;
// The content length of the stream, if known. A negative number is interpreted as infinite (e.g.
// for a kafka partition).
sint64 content_length_hint = 2;
}
// The Flow runtime sends a ListStreamsRequest to the driver in order to dicover streams that are
// available to be read.
message ListStreamsRequest {
Capture capture = 1;
// Flow will send the most recent checkpoint value that was returned in the most recent ListStreamsResponse
// that has had all of its streams successfully read to completion. Here "all" means only all of the
// streams that are meant to be handled by the current Captures shard.
bytes checkpoint = 2;
}
// Sent by the Driver in response to a ListStreamsRequest, to inform the runtime of the distinct streams
// that may be read. The Driver returns a stream of these messages to allow for continued discovery
// of new streams.
message ListStreamsResponse {
// Optional checkpoint value that pertains _only_ to the list of streams (not related to ingest
// progress on any single stream). For S3, this would be the most recent `LastModified` timestamp
// from a stream in the list. This value will be passed back to the driver on future
// ListStreamsRequests only once all streams given in this message have been fully processed.
bytes checkpoint = 1;
// List of streams that are available to be read.
repeated Stream streams = 2;
}
// Sent by the runtime to initiate, resume, or stop reading from a particular stream.
message ReadRequest {
message Open {
Capture capture = 1;
bytes prev_checkpoint = 2;
string partition_id = 3;
}
// If non-null, then this is a request to initiate or resume reading from a stream.
Open open = 1;
// We might not need this. The idea was just to allow the drivers to gracefully stop reading from
// a stream whenever the runtime is stopping, for example because it's reacting to a newly applied
// catalog.
bool close = 2;
}
// Sent from the driver to the runtime to send both data and checkpoints related to a given stream.
message ReadResponse {
message TxnStart {
// Which protocol data will be returned in (json/v1, etc)
string protocol = 1;
}
// The first ReadResponse in the RPC must include a txn_start. All subsequent ReadResponse messages
// for the same RPC must not include txn_start.
TxnStart txn_start = 1;
// Actual data to import, which must match the expectations of the given protocol. For example, if
// the procol is `csv/v1`, then data must be in CSV format. Additionally, data given here must
// begin and end at valid record boundaries. In other words, CSV data must not start or end in the
// middle of a row, and JSON data must begin with a `{` and end with a `}` (though it may contain
// many distinct records in between).
bytes data = 2;
// Checkpoint associated with the end of the data frame. For kafka, this would be the offset
// within the partition. For S3, this would be a combination of the offset within the file and the
// ETag associated with the file. The addition of the ETag allows the S3 driver to restart from
// the beginning when an object has been replaced.
bytes checkpoint = 3;
// May be set only on the final ReadResponse in a response stream to indicate that the stream has
// been removed from the remote system. The flow runtime will use this to prune any state that is
// currently stored for this stream after the import completes, regardless of whether the
// ingestion was successful or not.
bool deleted = 4;
}
// A Driver service defines the boundary between the Flow runtime (common to all captures) and the code
// that is specific to a particular type of external system.
// The overall usage is broken down into two separate areas, apply-time and runtime. The runtime
// manages a state store that's associated with each shard. If the shard is ever split, then the
// state will pass from the parent to all children.
//
// Apply-time:
// - Flowctl calls Validate and uses the results for, well, validation.
// - Flowctl creates a number of captures shards, which determines the base level of parallelism.
// - Assigns a keyspace range to each shard, which determines the streams that it will be
// responsible for.
//
// Runtime (on each shard):
// Discovery loop:
// - Runtime calls ListStreams, passing in the checkpoint from the last fully-processed
// ListStreamsResponse.
// - For each ListStreamsResponse, filter out streams that aren't assigned to the shard, add each
// assigned stream to the state store with a zeroed checkpoint if it's not already present.
// - Loop around and call ListStreams again
//
// Ingest loop:
// - Iterate all the streams in the state store
// - Filter out and delete streams that don't hash into this shards key range. Non-assigned streams
// could be present if the state store was inherited from a parent shard that split.
// - `Read` and ingest each stream. See note below on handling finite vs infinite streams.
//
// Finite vs infinite streams:
// We need to be careful with parallelism _within_ each captures shard. Generally we don't want
// parallelism at this level, and would instead prefer to just create multiple Captures shards.
// For example, an S3 bucket listing might return thousands of object that all hash to the current
// shard, and we must not try to import them all at once. Ideally, we'd use pipelined transactions
// to import each thing sequentially. But that breaks as soon as any one stream is effectively
// infinitely long, like a Kafka partition. For those, we need to import them concurrently if there
// are multiple that hash to the same shard. This would happen if, for example, you create 4 shards
// to import from 4 partitions and then a new partition is added. Or it might even happen if you
// just get unlucky and have multiple partitions hash into the same shard :/
// It would be nice if there were a more elegant way of handling this, so LMK if you think of any.
// But the current proposal is to essentially just allow for concurrency only for infinite streams,
// and ingest everything else sequentially.
service Driver {
// Called by flowctl as part of the apply process to validate a capture and determine the resource
// path.
rpc Validate(Capture) returns (ValidateResponse);
// Retrieves a list of Streams from the remote system, and continuously watches for new streams to
// be added. The runtime will keep a ListStreams RPC active at all times by re-connecting anytime
// the response stream is closed. Many systems will require polling in order to discover new
// streams, though. Other systems may not support multiple streams at all. And others may only
// support reading from streams that are explicitly named in the capture configuration (MQTT), and
// thus will not benefit from any dynamism here. An open question is whether we should help with
// such systems by adding an optional `recheck_after` duration field in the final
// `ListStreamsResponse`, so that the runtime can avoid calling ListStreams in these cases. I've
// left that out for now, but thinking it could be worth having.
rpc ListStreams(ListStreamsRequest) returns (stream ListStreamsResponse)
// Reads a particular Stream. The response stream will either return the actual stream data, or
// else indicate that the stream has either been deleted or has no new data available (for finite
// streams).
// - If the stream has since been deleted, there will only be a single ReadResponse with
// `deleted=true`.
// - If the stream is finite and has no new data available, then the driver will
// return a single `ReadResposne` with no `data` and whatever `checkpoint` it wants to store (it
// may update it or just return the same checkpoint value).
// - If there is data to be read, the first
// ReadResponse must include `txn_start`, which tells the Runtime which protocol to use. The
// runtime then ingests all the data in the stream, updating the checkpoint along the way, until
// the stream is closed.
//
// Technically, we can be as loose or restrictive as we want to be with the `data` and
// `checkpoint` in a `ReadResponse`. For example, even when there's data available to read there's
// technically no reason we couldn't support sending `data` with no `checkpoint` (to keep the
// transaction open), or a `checkpoint` with no `data` (to commit an open transaction, or just
// update the state in rocksdb). My inclination would be to always require that both are present,
// though, since we can always loosen it later if we need.
rpc Read(stream ReadRequest) returns (stream ReadResponse);
} |
Beta Was this translation helpful? Give feedback.
-
This is meant to get the ball rolling on the design for Captures. As a first step, this collects
requirements and relevant information on a representative selection of systems we want to capture
from.
Regardless of the type of system, all Captures will have some associated state that represents
some sort of checkpoint(s). These checkpoints are something that the drivers would need to use in order to
resume fetching data from the remote system from wherever it left off. Some systems (e.g. Kafka)
have the ability to store this state in the external system, but for most systems this is
impractical or impossible. That leaves Flow/Gazette as the system of record for this state, at least
for some cases. The summaries that follow all try to address considerations related to this.
Kafka
There's a number of different options for kafka libraries and ways to use them. The first is
"Consumer Groups". With consumer groups, the kafka brokers manage the offsets for you, and all the
client needs to do is supply a group id (as a key for broker-managed offsets and such), and a
"topic". Consumer groups automatically distribute all the partitions of a topic among all the
instances within a consumer group. Consumers have control over when their read offsets are
committed. You can either use auto-commit, or manually commit. If you use auto-commit, then the API
gives you an event with the offsets after each successful commit, which happens whenever the broker
feels like it. If you manually commit, then you can choose to commit either before or after message
processing, which basically corresponds to at-most-once vs at-least-once processing guarantees.
The other way of interacting with Kafka is using the lower level API, where you create a
Reader
for each specific topic-partition combination you want to read. The official Confluent Golang driver
doesn't expose this type of API, but the Segment Kafka
driver does. This API is maybe better
suited to our needs because offsets could be entirely managed by Flow and committed to the recovery
log along with each transaction. A captures driver based on this API might just need to pass along
the offsets with each batch of messages, and the Flow runtime could pass back the most recent
offsets when it initializes the materialization shards.
Parallel processing of captures adds another dimension to this. With consumer groups, the Kafka
broker manages distributing topic partitions among all the consumer instances in each group. Thus
you can change the number of instances at any time and have the partitions get automatically
re-distributed among readers. Using the low level APIs would require that we first determine how
many partitions are in each topic, and then distribute them among all shards for a given Capture.
Contrary to my previous belief, Kafka topics can have partitions added at any time. There is just
the significant caveat that kafka does nothing to help with redistribution of data among logical
partitions. But topics that don't treat partitions as logical partitions may add new ones at any
time.
S3
This is perhaps one of the most interesting use cases for captures because it seems to have so much
conceptual overlap with many other systems that we'd want to capture from. Not just other cloud
storage providers, but also stuff like SFTP, WebDAV, RSS, and anything that's REST based. You're
surely already familiar with the API, so I won'd re-hash that.
The basic idea of an S3 captures driver would be to poll the
ListObjects
API to discover availableobjects, and using either the
LastModified
timestamp orETag
to detect which files have beencompletely imported already. We'd need to store the offset within a file if we want the ability
to break up individual files into multiple transactions, which seems necessary given that files may
be extremely large.
Using the
LastModified
timestamp allows for filtering files on the server side, and can also beused as a checkpoint. The danger is of course that there's no total ordering of timestamps (multiple
different files can have the same
LastModified
), so a checkpoint based on this would possibly needto store multiple key-timestamp-boolean tuples, where the boolean indicates whether the key has been
imported.
A final mention regarding S3 is the "watch" support. AWS lets you easily get notifications whenever
a bucket is modified, which may look pretty tempting at first. But using this stuff requires
additional configuration in the bucket, permissions, SQS or SNS topics, and cost. So I haven't
really taken a detailed look at this approach, since I don't think it's worth pursuing. But happy to
take another look if you think there's a promising approach I'm not aware of.
Kinesis
The Kinesis APIs are pretty similar to what's avaialble for Kafka, but the terminology is a little
different: (
Kafka->Kinesis
)topic->stream
andpartition->shard
. There's somthing akin to"consumer groups" as well as a lower level API where the client explicitly requests starting
offsets. The high level consumer APIs are quite different, though, as kinesis itself does not handle
distributing shards among active consumer processes. That's done on the client side, and it requires
the use of a separate storage layer (usually dynamodb), which has to be provisioned separately.
Probably the most significan thing about Kinesis is that the high-level consumer API seems
impractical for our needs, but the low-level API seems remarkably similar to the Kafka "Reader" API.
Another difference with Kinesis is that individual Kinesis shards are limited to 2MiB/Sec read
throughput. Kafka users tend not to go overboard with partitioning because reads from a single
partition are super cheap and fast. But with Kinesis, it will be especially important that we read
from lots of shards in parallel efficiently.
Also streams may have shards split or merged at any time. A Kinesis captures driver would need to
handle these events and update its subscriptions accordingly.
The low-level Kinesis API is really straight forward. You register a consumer and make requests to read
from individual shards, which stream back data from at most 5 minutes before you need to reconnect.
There's some specific request rate limits that we'll have to account for, but nothing crazy.
MQTT
This can be an important protocol in IOT and industrial use cases, and it's a good example to
include here because it has essentially no support for historical data. Subscribers only receive new
data that's published. Technically, there's some QOS settings that you can use to tolerate smallish
windows of downtime without losing messages, but there's no real guarantees, and there's no way to
re-read messages in order to re-try if an ingestion fails.
Basically, any downtime of the captures driver means you're possibly losing data. We could probably
come up with some way to enable hot-standbys for captures, but it it's probably not worth it. If
data loss were a major concern for the user, they'd probably be using a different protocol. But I do
think we'd want to try to provide good observability into the state of the driver. In a kafka
driver, for example, it'd probably be fine for the driver to just log an error and re-try a failed
read for a topic partition. But for an MQTT driver, we may want to do more than just log an error,
and may want to surface some concept of uptime/downtime via the protocol. This would need more work
to ground out, any maybe something that's better to defer.
The QOS options in MQTT are worth mentioning. Every client needs a unique id. When the client
connects, it can set
cleanSession = false
to opt in to a "stateful" session that's tied to theunique id. If a second instance connects to the broker using the same id, the first instance gets
its connection closed. The broker uses the unique id to queue up messages that are sent on topics
the client is subscribed to, as long as those messages are sent with a QOS of 1 or 2 (the default is
0, which is "at most once"). The significant thing here is that QOS is set on a per-message basis by
the producer, and the subscriber's QOS simply acts as a max value that's used. Users would definitely
want to configure their QOS for captures, since it can have a major impact on the broker.
CDC
TLDR: This section is just a placeholder for now. This is a gap in my knowledge
that I want to explicitly call out. We will certainly need a better understanding of what our
options are here before we proceed. It's not even clear to me whether CDC should operate on a push
vs a pull based model. What I will say is that there's an absolutely dizzying array of options out
there, often with wildly different integration models. But there's a number of options that claim to
work for a wide variety of databases, and the hope is that we can "just pick one" and not have to
go too far down the CDC rabbit hole.
Partitioning Considerations
Whenever we need multiple shards for a single capture, then we'll have to deal with distributing
source partitions among captures shards. If we only considered Kafka, we might design a system that
looks at the number of topic partitions at catalog build time and creates a shard per partition. But
systems (including Kafka) can change the number of partitions at any time. One answer to that might
be to move partition discovery into the runtime side of the captures protocol, and to have the Flow
runtime use that to dynamically assign source partitions to shards using rendezvous hashing. Another
option is to push that consideration down into captures drivers. My gut says the former option is
probably best, but I'm very open to other ideas there.
The approach of dynamically assigning source partitions to captures shards seems pretty promising,
though. It may also allow for unification of "streaming" (e.g. kafka) and "file-based" (e.g. S3)
systems under a single protocol. Files in S3 could be treated in the same way as partitions of a
Kafka topic. A captures protocol RPC could be used for discovery of partitions/files/whatever, which
could then be read using a separate RPC after the Flow runtime uses rendezvous hashing to determine
whether it should process it. So far this seems to me like the most promising approach to a captures
protocol, but I think it makes sense to get a better idea of what CDC might look like before forming
too strong of an opinion.
Beta Was this translation helpful? Give feedback.
All reactions