Skip to content

Commit

Permalink
Merge pull request #23792 from benesch/topic-metadata-refresh-interval
Browse files Browse the repository at this point in the history
sql,storage: enable configurable group and transactional IDs for Kafka
  • Loading branch information
benesch authored Dec 14, 2023
2 parents 67fd623 + b61fc78 commit d68a5de
Show file tree
Hide file tree
Showing 38 changed files with 1,015 additions and 981 deletions.
2 changes: 1 addition & 1 deletion doc/user/content/sql/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ The `mz_kafka_sources` table contains a row for each Kafka source in the system.
| Field | Type | Meaning |
|------------------------|----------------|-----------------------------------------------------------------------------------------------------------|
| `id` | [`text`] | The ID of the Kafka source. Corresponds to [`mz_catalog.mz_sources.id`](../mz_catalog#mz_sources). |
| `group_id_base` | [`text`] | The prefix of the group ID that Materialize will use when consuming data for the Kafka source. |
| `group_id_prefix` | [`text`] | The prefix of the group ID that Materialize will use when consuming data for the Kafka source. |

### `mz_materialization_lag`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def initialize(self) -> Testdrive:
"""
$[version>=5500] postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET enable_create_source_denylist_with_options = true
ALTER SYSTEM SET enable_kafka_config_denylist_options = true
$ kafka-create-topic topic=multiple-partitions-topic
Expand All @@ -37,7 +36,7 @@ def initialize(self) -> Testdrive:
{"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
> CREATE SOURCE multiple_partitions_source
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-multiple-partitions-topic-${testdrive.seed}', TOPIC METADATA REFRESH INTERVAL MS 500)
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-multiple-partitions-topic-${testdrive.seed}', TOPIC METADATA REFRESH INTERVAL '500ms')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT
Expand Down
41 changes: 37 additions & 4 deletions src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use mz_catalog::durable::Transaction;
use mz_ore::collections::CollectionExt;
use mz_ore::now::{EpochMillis, NowFn};
use mz_sql::ast::display::AstDisplay;
use mz_sql::ast::Raw;
use mz_sql::ast::{Raw, Statement};
use mz_storage_types::configuration::StorageConfiguration;
use mz_storage_types::connections::ConnectionContext;
use mz_storage_types::sources::GenericSourceConnection;
Expand All @@ -33,7 +33,7 @@ where
F: for<'a> FnMut(
&'a mut Transaction<'_>,
&'a &ConnCatalog<'_>,
&'a mut mz_sql::ast::Statement<Raw>,
&'a mut Statement<Raw>,
) -> BoxFuture<'a, Result<(), anyhow::Error>>,
{
let mut updated_items = BTreeMap::new();
Expand Down Expand Up @@ -70,8 +70,8 @@ pub(crate) async fn migrate(

// Perform per-item AST migrations.
let conn_cat = state.for_system_session();
rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _item| {
let _catalog_version = catalog_version.clone();
rewrite_items(tx, &conn_cat, |_tx, _conn_cat, stmt| {
let catalog_version = catalog_version.clone();
Box::pin(async move {
// Add per-item AST migrations below.
//
Expand All @@ -87,6 +87,10 @@ pub(crate) async fn migrate(
// Migration functions may also take `tx` as input to stage
// arbitrary changes to the catalog.

if catalog_version <= Version::new(0, 79, u64::MAX) {
ast_rewrite_create_sink_into_kafka_options_0_80_0(stmt)?;
}

Ok(())
})
})
Expand Down Expand Up @@ -244,6 +248,35 @@ async fn ast_rewrite_postgres_source_timeline_id_0_80_0(
Ok(())
}

fn ast_rewrite_create_sink_into_kafka_options_0_80_0(
stmt: &mut Statement<Raw>,
) -> Result<(), anyhow::Error> {
use mz_sql::ast::visit_mut::VisitMut;
use mz_sql::ast::{
CreateSinkConnection, CreateSinkStatement, KafkaSinkConfigOption,
KafkaSinkConfigOptionName, Value, WithOptionValue,
};

struct Rewriter;

impl<'ast> VisitMut<'ast, Raw> for Rewriter {
fn visit_create_sink_statement_mut(&mut self, node: &'ast mut CreateSinkStatement<Raw>) {
match &mut node.connection {
CreateSinkConnection::Kafka { options, .. } => {
options.push(KafkaSinkConfigOption {
name: KafkaSinkConfigOptionName::LegacyIds,
value: Some(WithOptionValue::Value(Value::Boolean(true))),
});
}
}
}
}

Rewriter.visit_statement_mut(stmt);

Ok(())
}

fn _add_to_audit_log(
tx: &mut Transaction,
event_type: mz_audit_log::EventType,
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1890,7 +1890,7 @@ pub static MZ_KAFKA_SOURCES: Lazy<BuiltinTable> = Lazy::new(|| BuiltinTable {
schema: MZ_INTERNAL_SCHEMA,
desc: RelationDesc::empty()
.with_column("id", ScalarType::String.nullable(false))
.with_column("group_id_base", ScalarType::String.nullable(false)),
.with_column("group_id_prefix", ScalarType::String.nullable(false)),
is_retained_metrics_object: false,
sensitivity: DataSensitivity::Public,
});
Expand Down
6 changes: 6 additions & 0 deletions src/kafka-util/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ use rdkafka::{ClientContext, Statistics, TopicPartitionList};
use tokio::runtime::Handle;
use tracing::{debug, error, info, warn, Level};

/// A reasonable default timeout when refreshing topic metadata.
// 30s may seem infrequent, but the default is 5m. More frequent metadata
// refresh rates are surprising to Kafka users, as topic partition counts hardly
// ever change in production.
pub const DEFAULT_TOPIC_METADATA_REFRESH_INTERVAL: Duration = Duration::from_secs(30);

/// A reasonable default timeout when fetching metadata or partitions.
pub const DEFAULT_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(10);

Expand Down
5 changes: 3 additions & 2 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ Hour
Hours
Id
Idle
Ids
If
Ignore
Ilike
Expand Down Expand Up @@ -204,6 +205,7 @@ Latest
Leading
Least
Left
Legacy
Level
Like
Limit
Expand All @@ -229,7 +231,6 @@ Minutes
Mode
Month
Months
Ms
Mutually
Name
Names
Expand Down Expand Up @@ -307,7 +308,6 @@ Replication
Reset
Respect
Restrict
Retention
Return
Returning
Revoke
Expand Down Expand Up @@ -384,6 +384,7 @@ Tpch
Trace
Trailing
Transaction
Transactional
Trim
True
Tunnel
Expand Down
121 changes: 70 additions & 51 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -819,47 +819,36 @@ impl<T: AstInfo> AstDisplay for CreateConnectionOption<T> {
impl_display_t!(CreateConnectionOption);

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum KafkaConfigOptionName {
CompressionType,
pub enum KafkaSourceConfigOptionName {
GroupIdPrefix,
Topic,
TopicMetadataRefreshIntervalMs,
TopicMetadataRefreshInterval,
StartTimestamp,
StartOffset,
PartitionCount,
ReplicationFactor,
RetentionMs,
RetentionBytes,
}

impl AstDisplay for KafkaConfigOptionName {
impl AstDisplay for KafkaSourceConfigOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str(match self {
KafkaConfigOptionName::CompressionType => "COMPRESSION TYPE",
KafkaConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
KafkaConfigOptionName::Topic => "TOPIC",
KafkaConfigOptionName::TopicMetadataRefreshIntervalMs => {
"TOPIC METADATA REFRESH INTERVAL MS"
KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
KafkaSourceConfigOptionName::Topic => "TOPIC",
KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
"TOPIC METADATA REFRESH INTERVAL"
}
KafkaConfigOptionName::StartOffset => "START OFFSET",
KafkaConfigOptionName::StartTimestamp => "START TIMESTAMP",
KafkaConfigOptionName::PartitionCount => "PARTITION COUNT",
KafkaConfigOptionName::ReplicationFactor => "REPLICATION FACTOR",
KafkaConfigOptionName::RetentionBytes => "RETENTION BYTES",
KafkaConfigOptionName::RetentionMs => "RETENTION MS",
KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
})
}
}
impl_display!(KafkaConfigOptionName);
impl_display!(KafkaSourceConfigOptionName);

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
/// An option in a `{FROM|INTO} CONNECTION ...` statement.
pub struct KafkaConfigOption<T: AstInfo> {
pub name: KafkaConfigOptionName,
pub struct KafkaSourceConfigOption<T: AstInfo> {
pub name: KafkaSourceConfigOptionName,
pub value: Option<WithOptionValue<T>>,
}

impl<T: AstInfo> AstDisplay for KafkaConfigOption<T> {
impl<T: AstInfo> AstDisplay for KafkaSourceConfigOption<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_node(&self.name);
if let Some(v) = &self.value {
Expand All @@ -868,32 +857,46 @@ impl<T: AstInfo> AstDisplay for KafkaConfigOption<T> {
}
}
}
impl_display_t!(KafkaConfigOption);
impl_display_t!(KafkaSourceConfigOption);

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct KafkaConnection<T: AstInfo> {
pub connection: T::ItemName,
pub options: Vec<KafkaConfigOption<T>>,
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum KafkaSinkConfigOptionName {
CompressionType,
ProgressGroupIdPrefix,
Topic,
TransactionalIdPrefix,
LegacyIds,
}

impl<T: AstInfo> AstDisplay for KafkaConnection<T> {
impl AstDisplay for KafkaSinkConfigOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str("CONNECTION ");
f.write_node(&self.connection);
if !self.options.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(&self.options));
f.write_str(")");
}
f.write_str(match self {
KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
KafkaSinkConfigOptionName::Topic => "TOPIC",
KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
})
}
}
impl_display_t!(KafkaConnection);
impl_display!(KafkaSinkConfigOptionName);

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct KafkaSourceConnection<T: AstInfo> {
pub connection: KafkaConnection<T>,
pub key: Option<Vec<Ident>>,
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct KafkaSinkConfigOption<T: AstInfo> {
pub name: KafkaSinkConfigOptionName,
pub value: Option<WithOptionValue<T>>,
}

impl<T: AstInfo> AstDisplay for KafkaSinkConfigOption<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_node(&self.name);
if let Some(v) = &self.value {
f.write_str(" = ");
f.write_node(v);
}
}
}
impl_display_t!(KafkaSinkConfigOption);

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum PgConfigOptionName {
Expand Down Expand Up @@ -936,7 +939,10 @@ impl_display_t!(PgConfigOption);

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum CreateSourceConnection<T: AstInfo> {
Kafka(KafkaSourceConnection<T>),
Kafka {
connection: T::ItemName,
options: Vec<KafkaSourceConfigOption<T>>,
},
Postgres {
/// The postgres connection.
connection: T::ItemName,
Expand All @@ -954,12 +960,15 @@ pub enum CreateSourceConnection<T: AstInfo> {
impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
CreateSourceConnection::Kafka(KafkaSourceConnection { connection, key }) => {
f.write_str("KAFKA ");
CreateSourceConnection::Kafka {
connection,
options,
} => {
f.write_str("KAFKA CONNECTION ");
f.write_node(connection);
if let Some(key) = key.as_ref() {
f.write_str(" KEY (");
f.write_node(&display::comma_separated(key));
if !options.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(options));
f.write_str(")");
}
}
Expand Down Expand Up @@ -1056,17 +1065,27 @@ impl_display_t!(LoadGeneratorOption);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum CreateSinkConnection<T: AstInfo> {
Kafka {
connection: KafkaConnection<T>,
connection: T::ItemName,
options: Vec<KafkaSinkConfigOption<T>>,
key: Option<KafkaSinkKey>,
},
}

impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
CreateSinkConnection::Kafka { connection, key } => {
f.write_str("KAFKA ");
CreateSinkConnection::Kafka {
connection,
options,
key,
} => {
f.write_str("KAFKA CONNECTION ");
f.write_node(connection);
if !options.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(options));
f.write_str(")");
}
if let Some(key) = key.as_ref() {
f.write_node(key);
}
Expand Down
Loading

0 comments on commit d68a5de

Please sign in to comment.