-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dekaf materialization endpoint support #1840
base: master
Are you sure you want to change the base?
Conversation
173b9a2
to
2528553
Compare
41b05c2
to
457cb62
Compare
cd55a23
to
a16d4c5
Compare
…s using `/authorize/dekaf` and `/authorize/task` Also add a hint for passing a collection name as a topic name, when the binding has renamed that topic
* Connector projections: emit recommended constraints for all fields -- you get everything by default, and you can modify the selection however you like * Schema: Build a schema from the materialization's built spec's `field_selection` and the collection's projections that will match the extracted documents * Extraction: Implement field extraction using the `extractors` crate to emit documents that match the "value schema"
d1c933d
to
88e456f
Compare
88e456f
to
cf35f2b
Compare
… a Session and write them to the correct ops logs journal Also support filtering logs by the requested shard log level
Then implement some tests to validate field selection logic
f86c86a
to
786adee
Compare
Part of dekaf: Improvements to handle higher scale #1876, we want to implement broker fallback so Dekaf can connect to any of the brokers in the cluster if one doesn't respond. An improvement here would be to periodically fetch the metadata from at least one of the responding brokers and update this list of addresses so that future sessions can know about/use any newly created members of the cluster. I don't anticipate changing the topology of our cluster that frequently, and if we do then updating Dekaf's deployment configs isn't that big of a deal. I may eat my hat on this, we'll see. In addition, we want to move people over to the new MSK cluster, so this implements routing new-style connections to a separate cluster with separate credentials.
A couple things to note: * I originally tried to create a single `journal::Client` responsible for appending both logs and stats, but I ended up realizing that `/authorize/task` only allows authoring a token for a single task/prefix at a time. So I took the simpler route of creating two clients, rather than teaching `/authorize/task` how to handle multiple tasks, which has some fairly delicate requirements. * As it turns out, the stats rollups assume the presence of a `shard` field on both logs and stats. So I ended up needing to craft a `ShardRef` that just contains the Dekaf materialization name, and attach it to both the logs and stats documents that get emitted.
786adee
to
966303f
Compare
I noticed that after roughly 1-2 hours, Dekaf would stop writing logs and stats. I tracked that down to an error appending logs, specifically: ``` Grpc( Status { code: DeadlineExceeded, message: "context deadline exceeded" } ) ``` It turns out that this is the error Gazette returns when the auth token you pass it is expired, and the appending machinery in Dekaf wasn't taking into account token expiry. So this commit refactors `GazetteWriter` to be composed of two `GazetteAppender`s, one for logs and one for stats. Each `GazetteAppender` is capable of refreshing its internal client when neccesary
We can still fetch suspended journals with a regular `ListRequest`. This will return journal specs which contain a `suspend` field. If `journal.spec.suspend.level` is `FULL`, it's not possible to read from that journal. So we need to: * Report both low and high-watermarks as `journal.spec.suspend.offset` * Serve empty resultsets for any read against this partition
966303f
to
d038c06
Compare
|
||
record_bytes += tmp.len(); | ||
buf.extend_from_slice(&tmp); | ||
tmp.clear(); | ||
Some(buf.split().freeze()) | ||
}; | ||
|
||
input_bytes += next_offset - self.offset; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will count acks... should we skip those?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, we skip acks in the runtime -- they're neither counted as a "doc" nor do we accumulate their bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! 🚢
I know you're still working on amortizing multiple log appends, but all of the big picture stuff looks solid, and I think this can land as-is.
Going forward (not this PR), a refactor I'd urge you to look into is separating the pre-authorization session loop from the post-authorization loop. A refactored pre-auth session loop would only expect authorization-related messages. It would build up all of the bits of session state & context, and then the moment the session authorization is complete, it would tail-call into the post-authorization handler.
The post-auth handler then gets the benefit of assuming the session context is fully established, that the tokio::task_local! is set and Some, and it can also use tracing::instrument for contextual fields of the session (instead of using tracing-record-hierarchical to mutate the parent Span).
tracing::debug!(client_id=?header.client_id, "Got client ID!"); | ||
session.client_id = header.client_id.clone().map(|id| id.to_string()); | ||
if let Some(client_id) = &header.client_id { | ||
tracing::Span::current() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, and no changes in this PR, but noting for the future:
The fact that this is reaching for record_hierarchical
tells me that the session handling ought to be refactored into a pre-authorization handler, that then tail-calls down into a post-authorization handler which uses tracing::instrument.
crates/dekaf/src/log_appender.rs
Outdated
"Got recoverable error trying to write logs, retrying" | ||
); | ||
|
||
tokio::time::sleep(Duration::from_millis(wait_ms)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gazette
handles backoff sleeps for you already, so this will be on top of what it's doing.
crates/dekaf/src/log_appender.rs
Outdated
loop { | ||
match resp.try_next().await { | ||
Ok(_) => return Ok(()), | ||
Err(RetryError { inner: err, .. }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This case seems like it could be removed
break name; | ||
} | ||
Some(TaskWriterMessage::Log(log)) => { | ||
pending_logs.push_front(log); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's logged prior to authorization that's valuable to preserve in task logs? We can still direct stuff to application-level logs if needed. I'm just wondering if the juice is worth the squeeze.
let registry = tracing_subscriber::registry() | ||
.with(tracing_record_hierarchical::HierarchicalRecord::default()) | ||
.with( | ||
ops::tracing::Layer::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
glad this worked out! Way better
|
||
record_bytes += tmp.len(); | ||
buf.extend_from_slice(&tmp); | ||
tmp.clear(); | ||
Some(buf.split().freeze()) | ||
}; | ||
|
||
input_bytes += next_offset - self.offset; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, we skip acks in the runtime -- they're neither counted as a "doc" nor do we accumulate their bytes.
/// Note that since avro encoding can happen piecewise, there's never a need to | ||
/// put together the whole extracted document, and instead we can build up the | ||
/// encoded output iteratively | ||
fn extract_and_encode<'a>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 nice
|
||
// This lets us add our own "virtual" fields to Dekaf without having to add them to | ||
// doc::Extractor and all of the other platform machinery. | ||
impl CustomizableExtractor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it's very reasonable to add this in extractors.rs, but 🤷♂️
`Client::append` is fine if you only have a single buffer to append, but if you want to append an ongoing stream of messages in order, you fundamentally need somewhere to buffer your messages until they can be included in an append request.
a118c5e
to
794d951
Compare
Description:
This adds support for the server side of Dekaf's support for materialization endpoints. At a high level, Dekaf is just another way to get data out of Flow. We already have a well fleshed out concept for these things: materializations. So back in #1665 we introduced support for a new materialization "endpoint type":
dekaf
. This lives alongsidelocal
andconnector
as the third kind of materialization, and is configured like so:The second part of this work is for Dekaf the server to support this mode of operation. Briefly, it needs to:
I still have a couple of things on my list before this is fully wrapped up:
SessionAuthentication::Task
sessions to a new/different backing store for migration purposesThis change is![Reviewable](https://camo.githubusercontent.com/1541c4039185914e83657d3683ec25920c672c6c5c7ab4240ee7bff601adec0b/68747470733a2f2f72657669657761626c652e696f2f7265766965775f627574746f6e2e737667)