Skip to content

Commit

Permalink
sql,storage: enable configurable group and transactional IDs for Kafka
Browse files Browse the repository at this point in the history
Allow users to configure the consumer group and transactional IDs when
creating Kafka sources and sinks. Specifically:

  * The stabilized `GROUP ID PREFIX` option for Kafka sources allows
    setting the prefix of the consumer group ID.

  * The renamed and stabilized `PROGRESS GROUP ID PREFIX` option for
    Kafka sinks allows setting the prefix of the consumer group ID used
    when Materialize reads records from the sink's progress topic.

  * The new and instastabilized `TRANSACTIONAL ID` option for Kafka
    sinks allows setting the transacitonal ID used when Materialize
    produces records to the sink's data topic.

Also change the default progress group and transactional ID to be

    materialize-{REGION-ID}-{CONNECTION-ID}-{SINK-ID}

to be in line with the default group ID for Kafka sources.

Importantly, the new default includes the region ID, which makes it
possible to point multiple Materialize environments at the same Kafka
cluster without conflicts.

This is a backwards incompatible change, so this commit adds a migration
to keep the group and transacitonal IDs for existing Kafka sinks the
same.

Fix #11357.
Fix #23198.
  • Loading branch information
benesch committed Dec 10, 2023
1 parent 07abddc commit 661116b
Show file tree
Hide file tree
Showing 30 changed files with 453 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def initialize(self) -> Testdrive:
"""
$[version>=5500] postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET enable_create_source_denylist_with_options = true
ALTER SYSTEM SET enable_kafka_config_denylist_options = true
$ kafka-create-topic topic=multiple-partitions-topic
Expand Down
59 changes: 54 additions & 5 deletions src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ use futures::future::BoxFuture;
use mz_catalog::durable::Transaction;
use mz_ore::collections::CollectionExt;
use mz_ore::now::{EpochMillis, NowFn};
use mz_repr::GlobalId;
use mz_sql::ast::display::AstDisplay;
use mz_sql::ast::Raw;
use mz_sql::ast::{Raw, Statement};
use mz_storage_types::connections::ConnectionContext;
use semver::Version;
use tracing::info;
Expand All @@ -31,15 +32,16 @@ where
F: for<'a> FnMut(
&'a mut Transaction<'_>,
&'a &ConnCatalog<'_>,
&'a mut mz_sql::ast::Statement<Raw>,
GlobalId,
&'a mut Statement<Raw>,
) -> BoxFuture<'a, Result<(), anyhow::Error>>,
{
let mut updated_items = BTreeMap::new();
let items = tx.loaded_items();
for mut item in items {
let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;

f(tx, &cat, &mut stmt).await?;
f(tx, &cat, item.id, &mut stmt).await?;

item.create_sql = stmt.to_ast_string_stable();

Expand Down Expand Up @@ -68,8 +70,8 @@ pub(crate) async fn migrate(

// Perform per-item AST migrations.
let conn_cat = state.for_system_session();
rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _item| {
let _catalog_version = catalog_version.clone();
rewrite_items(tx, &conn_cat, |_tx, _conn_cat, id, stmt| {
let catalog_version = catalog_version.clone();
Box::pin(async move {
// Add per-item AST migrations below.
//
Expand All @@ -85,6 +87,10 @@ pub(crate) async fn migrate(
// Migration functions may also take `tx` as input to stage
// arbitrary changes to the catalog.

if catalog_version <= Version::new(0, 79, u64::MAX) {
ast_rewrite_create_sink_into_kafka_options_0_80_0(id, stmt)?;
}

Ok(())
})
})
Expand Down Expand Up @@ -112,6 +118,49 @@ pub(crate) async fn migrate(
// Please include the adapter team on any code reviews that add or edit
// migrations.

fn ast_rewrite_create_sink_into_kafka_options_0_80_0(
id: GlobalId,
stmt: &mut Statement<Raw>,
) -> Result<(), anyhow::Error> {
use mz_sql::ast::visit_mut::VisitMut;
use mz_sql::ast::{
CreateSinkConnection, CreateSinkStatement, KafkaSinkConfigOption,
KafkaSinkConfigOptionName, Value, WithOptionValue,
};

struct Rewriter {
id: GlobalId,
}

impl<'ast> VisitMut<'ast, Raw> for Rewriter {
fn visit_create_sink_statement_mut(&mut self, node: &'ast mut CreateSinkStatement<Raw>) {
match &mut node.connection {
CreateSinkConnection::Kafka { options, .. } => {
options.push(KafkaSinkConfigOption {
name: KafkaSinkConfigOptionName::ProgressGroupIdPrefix,
value: Some(WithOptionValue::Value(Value::String(format!(
"materialize-bootstrap-sink-{}",
self.id
)))),
});
options.push(KafkaSinkConfigOption {
name: KafkaSinkConfigOptionName::TransactionalId,
value: Some(WithOptionValue::Value(Value::String(format!(
"mz-producer-{}-0",
self.id
)))),
});
}
}
}
}

let mut rewriter = Rewriter { id };
rewriter.visit_statement_mut(stmt);

Ok(())
}

fn _add_to_audit_log(
tx: &mut Transaction,
event_type: mz_audit_log::EventType,
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1862,7 +1862,7 @@ pub static MZ_KAFKA_SOURCES: Lazy<BuiltinTable> = Lazy::new(|| BuiltinTable {
schema: MZ_INTERNAL_SCHEMA,
desc: RelationDesc::empty()
.with_column("id", ScalarType::String.nullable(false))
.with_column("group_id_base", ScalarType::String.nullable(false)),
.with_column("group_id_prefix", ScalarType::String.nullable(false)),
is_retained_metrics_object: false,
sensitivity: DataSensitivity::Public,
});
Expand Down
1 change: 1 addition & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ Tpch
Trace
Trailing
Transaction
Transactional
Trim
True
Tunnel
Expand Down
6 changes: 4 additions & 2 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,16 +862,18 @@ impl_display_t!(KafkaSourceConfigOption);
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum KafkaSinkConfigOptionName {
CompressionType,
GroupIdPrefix,
ProgressGroupIdPrefix,
Topic,
TransactionalId,
}

impl AstDisplay for KafkaSinkConfigOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str(match self {
KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
KafkaSinkConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
KafkaSinkConfigOptionName::Topic => "TOPIC",
KafkaSinkConfigOptionName::TransactionalId => "TRANSACTIONAL ID",
})
}
}
Expand Down
29 changes: 17 additions & 12 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2369,18 +2369,23 @@ impl<'a> Parser<'a> {
fn parse_kafka_sink_config_option(
&mut self,
) -> Result<KafkaSinkConfigOption<Raw>, ParserError> {
let name = match self.expect_one_of_keywords(&[COMPRESSION, GROUP, TOPIC])? {
COMPRESSION => {
self.expect_keyword(TYPE)?;
KafkaSinkConfigOptionName::CompressionType
}
GROUP => {
self.expect_keywords(&[ID, PREFIX])?;
KafkaSinkConfigOptionName::GroupIdPrefix
}
TOPIC => KafkaSinkConfigOptionName::Topic,
_ => unreachable!(),
};
let name =
match self.expect_one_of_keywords(&[COMPRESSION, PROGRESS, TOPIC, TRANSACTIONAL])? {
COMPRESSION => {
self.expect_keyword(TYPE)?;
KafkaSinkConfigOptionName::CompressionType
}
PROGRESS => {
self.expect_keywords(&[GROUP, ID, PREFIX])?;
KafkaSinkConfigOptionName::ProgressGroupIdPrefix
}
TOPIC => KafkaSinkConfigOptionName::Topic,
TRANSACTIONAL => {
self.expect_keyword(ID)?;
KafkaSinkConfigOptionName::TransactionalId
}
_ => unreachable!(),
};
Ok(KafkaSinkConfigOption {
name,
value: self.parse_optional_option_value()?,
Expand Down
5 changes: 3 additions & 2 deletions src/sql/src/kafka_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ generate_extracted_config!(
KafkaSinkCompressionType,
Default(KafkaSinkCompressionType::None)
),
(GroupIdPrefix, String),
(Topic, String)
(ProgressGroupIdPrefix, String),
(Topic, String),
(TransactionalId, String)
);

impl TryFromValue<Value> for KafkaSinkCompressionType {
Expand Down
55 changes: 10 additions & 45 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use mz_sql_parser::ast::{
CreateConnectionOption, CreateConnectionOptionName, CreateConnectionType, CreateTypeListOption,
CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName, DeferredItemName,
DocOnIdentifier, DocOnSchema, DropOwnedStatement, KafkaSinkConfigOption,
KafkaSinkConfigOptionName, MaterializedViewOption, MaterializedViewOptionName, SetRoleVar,
UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value,
MaterializedViewOption, MaterializedViewOptionName, SetRoleVar, UnresolvedItemName,
UnresolvedObjectName, UnresolvedSchemaName, Value,
};
use mz_sql_parser::ident;
use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
Expand Down Expand Up @@ -75,10 +75,10 @@ use crate::ast::{
CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DbzMode,
DropObjectsStatement, Envelope, Expr, Format, Ident, IfExistsBehavior, IndexOption,
IndexOptionName, KafkaSourceConfigOptionName, KeyConstraint, LoadGeneratorOption,
LoadGeneratorOptionName, PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica,
ReferencedSubsources, ReplicaDefinition, ReplicaOption, ReplicaOptionName, RoleAttribute,
SourceIncludeMetadata, Statement, TableConstraint, UnresolvedDatabaseName, ViewDefinition,
IndexOptionName, KeyConstraint, LoadGeneratorOption, LoadGeneratorOptionName, PgConfigOption,
PgConfigOptionName, ProtobufSchema, QualifiedReplica, ReferencedSubsources, ReplicaDefinition,
ReplicaOption, ReplicaOptionName, RoleAttribute, SourceIncludeMetadata, Statement,
TableConstraint, UnresolvedDatabaseName, ViewDefinition,
};
use crate::catalog::{
CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
Expand Down Expand Up @@ -609,25 +609,6 @@ pub fn plan_create_source(
)
}

// Starting offsets are allowed out with feature flags mode, as they are a simple,
// useful way to specify where to start reading a topic.
const ALLOWED_OPTIONS: &[KafkaSourceConfigOptionName] = &[
KafkaSourceConfigOptionName::StartOffset,
KafkaSourceConfigOptionName::StartTimestamp,
KafkaSourceConfigOptionName::Topic,
];

if let Some(op) = options
.iter()
.find(|op| !ALLOWED_OPTIONS.contains(&op.name))
{
scx.require_feature_flag_w_dynamic_desc(
&vars::ENABLE_KAFKA_CONFIG_DENYLIST_OPTIONS,
format!("FROM KAFKA CONNECTION ({}...)", op.name.to_ast_string()),
format!("permitted options are {}", comma_separated(ALLOWED_OPTIONS)),
)?;
}

let KafkaSourceConfigOptionExtracted {
group_id_prefix,
topic,
Expand Down Expand Up @@ -2494,29 +2475,11 @@ fn kafka_sink_builder(
),
};

// Starting offsets are allowed with feature flags mode, as they are a simple,
// useful way to specify where to start reading a topic.
const ALLOWED_OPTIONS: &[KafkaSinkConfigOptionName] = &[
KafkaSinkConfigOptionName::Topic,
KafkaSinkConfigOptionName::CompressionType,
];

if let Some(op) = options
.iter()
.find(|op| !ALLOWED_OPTIONS.contains(&op.name))
{
scx.require_feature_flag_w_dynamic_desc(
&vars::ENABLE_KAFKA_CONFIG_DENYLIST_OPTIONS,
format!("FROM KAFKA CONNECTION ({}...)", op.name.to_ast_string()),
format!("permitted options are {}", comma_separated(ALLOWED_OPTIONS)),
)?;
}

let KafkaSinkConfigOptionExtracted {
topic,
compression_type,
// TODO: plumb group ID through to sink.
group_id_prefix: _,
progress_group_id_prefix,
transactional_id,
seen: _,
}: KafkaSinkConfigOptionExtracted = options.try_into()?;

Expand Down Expand Up @@ -2625,6 +2588,8 @@ fn kafka_sink_builder(
key_desc_and_indices,
value_desc,
compression_type,
progress_group_id_prefix,
transactional_id,
}))
}

Expand Down
7 changes: 0 additions & 7 deletions src/sql/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1793,13 +1793,6 @@ feature_flags!(
internal: true,
enable_for_item_parsing: true,
},
{
name: enable_kafka_config_denylist_options,
desc: "Kafka sources with non-allowlisted options",
default: false,
internal: true,
enable_for_item_parsing: true,
},
{
name: enable_list_length_max,
desc: "the list_length_max function",
Expand Down
21 changes: 10 additions & 11 deletions src/storage-client/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use mz_kafka_util::client::{
};
use mz_ore::collections::CollectionExt;
use mz_ore::task;
use mz_repr::{GlobalId, Timestamp};
use mz_repr::Timestamp;
use mz_storage_types::connections::ConnectionContext;
use mz_storage_types::errors::{ContextCreationError, ContextCreationErrorExt};
use mz_storage_types::sinks::KafkaSinkConnection;
Expand All @@ -32,15 +32,6 @@ use tracing::{info, warn};

use crate::sink::progress_key::ProgressKey;

/// Formatter for Kafka group.id setting
pub struct SinkGroupId;

impl SinkGroupId {
pub fn new(sink_id: GlobalId) -> String {
format!("materialize-bootstrap-sink-{sink_id}")
}
}

pub mod progress_key {
use std::fmt;

Expand Down Expand Up @@ -324,6 +315,9 @@ pub async fn build_kafka(
connection_cx: &ConnectionContext,
) -> Result<Option<Timestamp>, ContextCreationError> {
// Fetch the progress of the last incarnation of the sink, if any.
let group_id = connection
.progress_group_id(connection_cx, sink_id)
.into_owned();
let progress_topic = connection.progress_topic(connection_cx).into_owned();
// For details about the two clients constructed here, see
// `determine_latest_progress_record`.
Expand All @@ -334,7 +328,12 @@ pub async fn build_kafka(
connection_cx,
MzClientContext::default(),
&btreemap! {
"group.id" => SinkGroupId::new(sink_id),
// Consumer group ID, which may have been overridden by the
// user. librdkafka requires this, even though we'd prefer
// to disable the consumer group protocol entirely.
"group.id" => group_id.clone(),
// Allow Kafka monitoring tools to identify this consumer.
"client.id" => group_id.clone(),
"isolation.level" => isolation_level.into(),
"enable.auto.commit" => "false".into(),
"auto.offset.reset" => "earliest".into(),
Expand Down
2 changes: 2 additions & 0 deletions src/storage-types/src/sinks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ message ProtoKafkaSinkConnectionV2 {
google.protobuf.Empty lz4 = 17;
google.protobuf.Empty zstd = 18;
}
optional string progress_group_id_prefix = 19;
optional string transactional_id = 20;
}

message ProtoPersistSinkConnection {
Expand Down
Loading

0 comments on commit 661116b

Please sign in to comment.