-
Notifications
You must be signed in to change notification settings - Fork 465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sql,storage: enable configurable group and transactional IDs for Kafka #23792
sql,storage: enable configurable group and transactional IDs for Kafka #23792
Conversation
This commit adds documentation for the features added in MaterializeInc#23792. See that PR for details.
This commit adds documentation for the features added in MaterializeInc#23792. See that PR for details.
661116b
to
35e05ee
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kafka sinks getting in shape 🚀
src/storage-client/src/sink.rs
Outdated
// to disable the consumer group protocol entirely. | ||
"group.id" => group_id.clone(), | ||
// Allow Kafka monitoring tools to identify this consumer. | ||
"client.id" => group_id.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be the value from connection.client_id()
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oof, yes, fixed.
src/storage-client/src/sink.rs
Outdated
partition_count: -1, | ||
replication_factor: -1, | ||
cleanup_policy: TopicCleanupPolicy::Retention { | ||
ms: Some(-1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a behavior change? If I followed the code on main correctly we're currently setting the broker defaults when creating the data topic. If we plan to eventually expose this in SQL it would be better to leave this the same so when we run the future migration we can safely set all existing sinks to use the broker default explicitly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eek! Yes, thanks. Total oversight due to the confusing API for this function. Fixed now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read all the code but paid much closer attention to the option handling changes in the SQL crate.
src/storage/src/source/kafka.rs
Outdated
|
||
// We want a fairly low ceiling on our polling frequency, since we rely | ||
// on this heartbeat to determine the health of our Kafka connection. | ||
let metadata_refresh_frequency = | ||
metadata_refresh_interval.min(Duration::from_secs(60)); | ||
let topic_metadata_refresh_interval = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this variable name confused me. Felt unclear why this is the same as the setting albeit with a ceiling applied. maybe something like topic_metadata_refresh_polling_interval
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to poll_interval
!
@@ -819,39 +819,36 @@ impl<T: AstInfo> AstDisplay for CreateConnectionOption<T> { | |||
impl_display_t!(CreateConnectionOption); | |||
|
|||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] | |||
pub enum KafkaConfigOptionName { | |||
CompressionType, | |||
pub enum KafkaSourceConfigOptionName { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙇 –– I've been meaning to fix this for weeks and just couldn't get around to it. tysm.
@@ -1793,13 +1793,6 @@ feature_flags!( | |||
internal: true, | |||
enable_for_item_parsing: true, | |||
}, | |||
{ | |||
name: enable_kafka_config_denylist_options, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
src/storage/src/sink/kafka.rs
Outdated
.await | ||
.check_ssh_status(fence_producer.context())?; | ||
let client_id = connection.client_id(connection_context, sink_id); | ||
let transactional_id = connection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allowing users to specify a particular id here introduces the risk of collisions, in case they specify the same transactional id for multiple sinks.
Can we get away with the <X> PREFIX
-type options we've provided elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also: we're going to need multiple transactional ids if we ever get around to horizontally scaling Kafka sinks, so we may want to reserve some flexibility for that reason too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed on Slack: done!
src/storage-types/src/sinks.rs
Outdated
) -> Cow<str> { | ||
match &self.progress_group_id_prefix { | ||
None => Cow::Owned(self.client_id(connection_context, sink_id)), | ||
Some(progress_group_id_prefix) => Cow::Borrowed(progress_group_id_prefix), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to add a suffix to the prefix here instead of just piping it through, yeah?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed on Slack: done!
src/storage-types/src/sources.rs
Outdated
) -> Cow<str> { | ||
match &self.group_id_prefix { | ||
None => Cow::Owned(self.client_id(connection_context, source_id)), | ||
Some(group_id_prefix) => Cow::Borrowed(group_id_prefix), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto - seems like either we need to add a suffix or rename this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed on Slack: done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Migration LGTM, though I think it would be re-assuring to add an upgrade test somewhere.
@jkosh44 Disregard last message––too many open PRs :D. |
35e05ee
to
c5c51d9
Compare
c5c51d9
to
9b98774
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I think this is plausibly mergeable. I addressed the bugs @petrosagg spotted (many thanks!), and adjusted the prefixing behavior per my conversation with @bkirwi.
9b98774
to
acea2ad
Compare
// meaning. For example, `partition_count` could have type | ||
// `Option<NonNeg<i32>>`, where `-1` is prohibited as a value and the Rustier | ||
// `None` value represents the broker default (n.b.: `u32` is not a good choice | ||
// because the maximum number of Kafka partitions is `i32::MAX`, not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, but I think a user would have an equally bad time even if i32::MAX
was accepted :D
I pushed a commit that adds support for |
c4e9d7c
to
274e113
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! (I have not reviewed the SQL-parsey stuff in detail, but the new ids and the migration look reasonable to me.)
(One commit message mentions that the new id format includes the region ID, but we include the environment id instead. Might be worth editing the message if we're not squashing the PR, but that's very minor and also seems fine to punt if you prefer.)
// Disable Kafka auto commit. We manually commit offsets | ||
// to Kafka once we have reclocked those offsets, so | ||
// that users can use standard Kafka tools for progress | ||
// tracking. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙏
WHERE mz_sinks.name = 'working' | ||
starting | ||
running | ||
paused |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🥇
274e113
to
39ad82d
Compare
Thanks for doing the I think it's pretty easy to prevent users from typing |
Indeed, in fact I had mostly written this when I thought "Nah, no sane user will actually type it out by looking at our PRs", but yeah a |
39ad82d
to
2ee7f6e
Compare
Added, and marked for automerge! 🤞🏽 |
Drop the `MS` from `TOPIC METADATA REFRESH INTERVAL MS`. The new option is spelled `TOPIC METADATA REFRESH INTERVAL` and takes a SQL interval rather than a raw number of milliseconds. This option is unsafe, so this is not a backwards incompatible change. This commit also adjusts the implementation so that `topic_metadata_refresh_interval` is a dedicated field of a `KafkaSourceConnection`. Part of #14656.
This commit removes the following options that configure the topic created for the Kafka sink: * `PARTITION COUNT` * `REPLICATION FACTOR` * `RETENTION MS` * `RETENTION BYTES` These options are all behind unsafe mode right now, so this is a backwards compatible change. The rationale: there are semantic questions about how these options should behave if the Kafka topic already exists. They also should be named `TOPIC PARTITION COUNT`, `TOPIC REPLICATION FACTOR`, etc., and equally available for the progress topic on the Kafka connection. Rather than spending the time trying to fix them now, just remove them, to keep the code simpler. Fix #14656.
Splitting the parsing and planning of Kafka sources and sinks is a net simplification for the code. Even though a few options are shared between sources and sinks (TOPIC, GROUP ID PREFIX), most are not. Having separate types for sources and sinks avoids a separate validation step to determine whether each option is valid for the context. This is a pure refactoring commit.
Allow users to configure the consumer group and transactional IDs when creating Kafka sources and sinks. Specifically: * The stabilized `GROUP ID PREFIX` option for Kafka sources allows setting the prefix of the consumer group ID. * The renamed and stabilized `PROGRESS GROUP ID PREFIX` option for Kafka sinks allows setting the prefix of the consumer group ID used when Materialize reads records from the sink's progress topic. * The new and instastabilized `TRANSACTIONAL ID PREFIX` option for Kafka sinks allows setting the transactional ID prefix used when Materialize produces records to the sink's data topic. Also change the default progress group and transactional ID to be materialize-{REGION-ID}-{CONNECTION-ID}-{SINK-ID} to be in line with the default group ID for Kafka sources. Importantly, the new default includes the region ID, which makes it possible to point multiple Materialize environments at the same Kafka cluster without conflicts. This is a backwards incompatible change, so this commit adds a migration to keep the group and transactional IDs for existing Kafka sinks the same. Fix #11357. Fix #23198.
Signed-off-by: Petros Angelatos <[email protected]>
2ee7f6e
to
b61fc78
Compare
This commit adds documentation for the features added in MaterializeInc#23792. See that PR for details.
This commit adds documentation for the features added in MaterializeInc#23792. See that PR for details.
Allow users to configure the consumer group and transactional IDs when
creating Kafka sources and sinks. Specifically:
The stabilized
GROUP ID PREFIX
option for Kafka sources allowssetting the prefix of the consumer group ID.
The renamed and stabilized
PROGRESS GROUP ID PREFIX
option forKafka sinks allows setting the prefix of the consumer group ID used
when Materialize reads records from the sink's progress topic.
The new and instastabilized
TRANSACTIONAL ID
option for Kafkasinks allows setting the transacitonal ID used when Materialize
produces records to the sink's data topic.
Also change the default progress group and transactional ID to be
to be in line with the default group ID for Kafka sources.
Importantly, the new default includes the region ID, which makes it
possible to point multiple Materialize environments at the same Kafka
cluster without conflicts.
This is a backwards incompatible change, so this commit adds a migration
to keep the group and transacitonal IDs for existing Kafka sinks the
same.
Fix https://github.com/MaterializeInc/database-issues/issues/3308.
Fix https://github.com/MaterializeInc/database-issues/issues/6987.
Motivation
Tips for reviewer
Go commit by commit. Commit messages are fairly detailed.
@sploiselle: could use your close review on all the SQL options handling refactoring. Unfortunately each and every commit includes some options handling changes. Commits 1 and 3 are particularly option handling heavy.
@jkosh44 and/or @mjibson: could use your close review on the migration to lock in the old transactional and group IDs for existing sinks.
@bkirwi and/or @petrosagg: you can probably skip the SQL options handling and focus on the new format of the group and transactional IDs.
Documentation PR: #23793.
Also, I know there's a desire to bring back the
PARTITION COUNT
,REPLICATION FACTOR
, etc. options that are removed in this PR. I wrote up an issue describing the design questions we need to answer in order to bring them back: https://github.com/MaterializeInc/database-issues/issues/7143. There were enough open questions that it seemed worth simplifying the code by removing the existing implementation. I think it's likely that the new implementation would have to rewrite much of that code anyway.Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.