Skip to content

Commit

Permalink
sql,storage: enable configurable group and transactional IDs for Kafka
Browse files Browse the repository at this point in the history
Allow users to configure the consumer group and transactional IDs when
creating Kafka sources and sinks. Specifically:

  * The stabilized `GROUP ID PREFIX` option for Kafka sources allows
    setting the prefix of the consumer group ID.

  * The renamed and stabilized `PROGRESS GROUP ID PREFIX` option for
    Kafka sinks allows setting the prefix of the consumer group ID used
    when Materialize reads records from the sink's progress topic.

  * The new and instastabilized `TRANSACTIONAL ID PREFIX` option for
    Kafka sinks allows setting the transactional ID prefix used when
    Materialize produces records to the sink's data topic.

Also change the default progress group and transactional ID to be

    materialize-{REGION-ID}-{CONNECTION-ID}-{SINK-ID}

to be in line with the default group ID for Kafka sources.

Importantly, the new default includes the region ID, which makes it
possible to point multiple Materialize environments at the same Kafka
cluster without conflicts.

This is a backwards incompatible change, so this commit adds a migration
to keep the group and transactional IDs for existing Kafka sinks the
same.

Fix #11357.
Fix #23198.
  • Loading branch information
benesch committed Dec 13, 2023
1 parent 34b5d10 commit c5c51d9
Show file tree
Hide file tree
Showing 34 changed files with 464 additions and 205 deletions.
2 changes: 1 addition & 1 deletion doc/user/content/sql/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ The `mz_kafka_sources` table contains a row for each Kafka source in the system.
| Field | Type | Meaning |
|------------------------|----------------|-----------------------------------------------------------------------------------------------------------|
| `id` | [`text`] | The ID of the Kafka source. Corresponds to [`mz_catalog.mz_sources.id`](../mz_catalog#mz_sources). |
| `group_id_base` | [`text`] | The prefix of the group ID that Materialize will use when consuming data for the Kafka source. |
| `group_id_prefix` | [`text`] | The prefix of the group ID that Materialize will use when consuming data for the Kafka source. |

### `mz_materialization_lag`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def initialize(self) -> Testdrive:
"""
$[version>=5500] postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET enable_create_source_denylist_with_options = true
ALTER SYSTEM SET enable_kafka_config_denylist_options = true
$ kafka-create-topic topic=multiple-partitions-topic
Expand Down
59 changes: 54 additions & 5 deletions src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ use futures::future::BoxFuture;
use mz_catalog::durable::Transaction;
use mz_ore::collections::CollectionExt;
use mz_ore::now::{EpochMillis, NowFn};
use mz_repr::GlobalId;
use mz_sql::ast::display::AstDisplay;
use mz_sql::ast::Raw;
use mz_sql::ast::{Raw, Statement};
use mz_storage_types::configuration::StorageConfiguration;
use mz_storage_types::connections::ConnectionContext;
use mz_storage_types::sources::GenericSourceConnection;
Expand All @@ -33,15 +34,16 @@ where
F: for<'a> FnMut(
&'a mut Transaction<'_>,
&'a &ConnCatalog<'_>,
&'a mut mz_sql::ast::Statement<Raw>,
GlobalId,
&'a mut Statement<Raw>,
) -> BoxFuture<'a, Result<(), anyhow::Error>>,
{
let mut updated_items = BTreeMap::new();
let items = tx.loaded_items();
for mut item in items {
let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;

f(tx, &cat, &mut stmt).await?;
f(tx, &cat, item.id, &mut stmt).await?;

item.create_sql = stmt.to_ast_string_stable();

Expand Down Expand Up @@ -70,8 +72,8 @@ pub(crate) async fn migrate(

// Perform per-item AST migrations.
let conn_cat = state.for_system_session();
rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _item| {
let _catalog_version = catalog_version.clone();
rewrite_items(tx, &conn_cat, |_tx, _conn_cat, id, stmt| {
let catalog_version = catalog_version.clone();
Box::pin(async move {
// Add per-item AST migrations below.
//
Expand All @@ -87,6 +89,10 @@ pub(crate) async fn migrate(
// Migration functions may also take `tx` as input to stage
// arbitrary changes to the catalog.

if catalog_version <= Version::new(0, 79, u64::MAX) {
ast_rewrite_create_sink_into_kafka_options_0_80_0(id, stmt)?;
}

Ok(())
})
})
Expand Down Expand Up @@ -244,6 +250,49 @@ async fn ast_rewrite_postgres_source_timeline_id_0_80_0(
Ok(())
}

fn ast_rewrite_create_sink_into_kafka_options_0_80_0(
id: GlobalId,
stmt: &mut Statement<Raw>,
) -> Result<(), anyhow::Error> {
use mz_sql::ast::visit_mut::VisitMut;
use mz_sql::ast::{
CreateSinkConnection, CreateSinkStatement, KafkaSinkConfigOption,
KafkaSinkConfigOptionName, Value, WithOptionValue,
};

struct Rewriter {
id: GlobalId,
}

impl<'ast> VisitMut<'ast, Raw> for Rewriter {
fn visit_create_sink_statement_mut(&mut self, node: &'ast mut CreateSinkStatement<Raw>) {
match &mut node.connection {
CreateSinkConnection::Kafka { options, .. } => {
options.push(KafkaSinkConfigOption {
name: KafkaSinkConfigOptionName::ProgressGroupIdPrefix,
value: Some(WithOptionValue::Value(Value::String(format!(
"materialize-bootstrap-sink-{}",
self.id
)))),
});
options.push(KafkaSinkConfigOption {
name: KafkaSinkConfigOptionName::TransactionalIdPrefix,
value: Some(WithOptionValue::Value(Value::String(format!(
"mz-producer-{}-0",
self.id
)))),
});
}
}
}
}

let mut rewriter = Rewriter { id };
rewriter.visit_statement_mut(stmt);

Ok(())
}

fn _add_to_audit_log(
tx: &mut Transaction,
event_type: mz_audit_log::EventType,
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1890,7 +1890,7 @@ pub static MZ_KAFKA_SOURCES: Lazy<BuiltinTable> = Lazy::new(|| BuiltinTable {
schema: MZ_INTERNAL_SCHEMA,
desc: RelationDesc::empty()
.with_column("id", ScalarType::String.nullable(false))
.with_column("group_id_base", ScalarType::String.nullable(false)),
.with_column("group_id_prefix", ScalarType::String.nullable(false)),
is_retained_metrics_object: false,
sensitivity: DataSensitivity::Public,
});
Expand Down
1 change: 1 addition & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ Tpch
Trace
Trailing
Transaction
Transactional
Trim
True
Tunnel
Expand Down
6 changes: 4 additions & 2 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,16 +862,18 @@ impl_display_t!(KafkaSourceConfigOption);
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum KafkaSinkConfigOptionName {
CompressionType,
GroupIdPrefix,
ProgressGroupIdPrefix,
Topic,
TransactionalIdPrefix,
}

impl AstDisplay for KafkaSinkConfigOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str(match self {
KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
KafkaSinkConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
KafkaSinkConfigOptionName::Topic => "TOPIC",
KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
})
}
}
Expand Down
29 changes: 17 additions & 12 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2369,18 +2369,23 @@ impl<'a> Parser<'a> {
fn parse_kafka_sink_config_option(
&mut self,
) -> Result<KafkaSinkConfigOption<Raw>, ParserError> {
let name = match self.expect_one_of_keywords(&[COMPRESSION, GROUP, TOPIC])? {
COMPRESSION => {
self.expect_keyword(TYPE)?;
KafkaSinkConfigOptionName::CompressionType
}
GROUP => {
self.expect_keywords(&[ID, PREFIX])?;
KafkaSinkConfigOptionName::GroupIdPrefix
}
TOPIC => KafkaSinkConfigOptionName::Topic,
_ => unreachable!(),
};
let name =
match self.expect_one_of_keywords(&[COMPRESSION, PROGRESS, TOPIC, TRANSACTIONAL])? {
COMPRESSION => {
self.expect_keyword(TYPE)?;
KafkaSinkConfigOptionName::CompressionType
}
PROGRESS => {
self.expect_keywords(&[GROUP, ID, PREFIX])?;
KafkaSinkConfigOptionName::ProgressGroupIdPrefix
}
TOPIC => KafkaSinkConfigOptionName::Topic,
TRANSACTIONAL => {
self.expect_keywords(&[ID, PREFIX])?;
KafkaSinkConfigOptionName::TransactionalIdPrefix
}
_ => unreachable!(),
};
Ok(KafkaSinkConfigOption {
name,
value: self.parse_optional_option_value()?,
Expand Down
6 changes: 3 additions & 3 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,11 @@ CREATE SOURCE psychic FROM POSTGRES CONNECTION pgconn (PUBLICATION = 'red')
CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("psychic")]), in_cluster: None, col_names: [], connection: Postgres { connection: Name(UnresolvedItemName([Ident("pgconn")])), options: [PgConfigOption { name: Publication, value: Some(Value(String("red"))) }] }, include_metadata: [], format: None, envelope: None, if_not_exists: false, key_constraint: None, with_options: [], referenced_subsources: None, progress_subsource: None })

parse-statement
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC 'topic', GROUP ID PREFIX 'prefix', COMPRESSION TYPE = gzip) FORMAT BYTES
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC 'topic', PROGRESS GROUP ID PREFIX 'prefix', COMPRESSION TYPE = gzip) FORMAT BYTES
----
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC = 'topic', GROUP ID PREFIX = 'prefix', COMPRESSION TYPE = gzip) FORMAT BYTES
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC = 'topic', PROGRESS GROUP ID PREFIX = 'prefix', COMPRESSION TYPE = gzip) FORMAT BYTES
=>
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("foo")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("bar")])), connection: Kafka { connection: Name(UnresolvedItemName([Ident("baz")])), options: [KafkaSinkConfigOption { name: Topic, value: Some(Value(String("topic"))) }, KafkaSinkConfigOption { name: GroupIdPrefix, value: Some(Value(String("prefix"))) }, KafkaSinkConfigOption { name: CompressionType, value: Some(Ident(Ident("gzip"))) }], key: None }, format: Some(Bytes), envelope: None, with_options: [] })
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("foo")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("bar")])), connection: Kafka { connection: Name(UnresolvedItemName([Ident("baz")])), options: [KafkaSinkConfigOption { name: Topic, value: Some(Value(String("topic"))) }, KafkaSinkConfigOption { name: ProgressGroupIdPrefix, value: Some(Value(String("prefix"))) }, KafkaSinkConfigOption { name: CompressionType, value: Some(Ident(Ident("gzip"))) }], key: None }, format: Some(Bytes), envelope: None, with_options: [] })

parse-statement
CREATE SINK FROM bar INTO KAFKA CONNECTION baz
Expand Down
5 changes: 3 additions & 2 deletions src/sql/src/kafka_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ generate_extracted_config!(
KafkaSinkCompressionType,
Default(KafkaSinkCompressionType::None)
),
(GroupIdPrefix, String),
(Topic, String)
(ProgressGroupIdPrefix, String),
(Topic, String),
(TransactionalIdPrefix, String)
);

impl TryFromValue<Value> for KafkaSinkCompressionType {
Expand Down
55 changes: 10 additions & 45 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use mz_sql_parser::ast::{
CreateConnectionOption, CreateConnectionOptionName, CreateConnectionType, CreateTypeListOption,
CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName, DeferredItemName,
DocOnIdentifier, DocOnSchema, DropOwnedStatement, KafkaSinkConfigOption,
KafkaSinkConfigOptionName, MaterializedViewOption, MaterializedViewOptionName, SetRoleVar,
UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value,
MaterializedViewOption, MaterializedViewOptionName, SetRoleVar, UnresolvedItemName,
UnresolvedObjectName, UnresolvedSchemaName, Value,
};
use mz_sql_parser::ident;
use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
Expand Down Expand Up @@ -75,10 +75,10 @@ use crate::ast::{
CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DbzMode,
DropObjectsStatement, Envelope, Expr, Format, Ident, IfExistsBehavior, IndexOption,
IndexOptionName, KafkaSourceConfigOptionName, KeyConstraint, LoadGeneratorOption,
LoadGeneratorOptionName, PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica,
ReferencedSubsources, ReplicaDefinition, ReplicaOption, ReplicaOptionName, RoleAttribute,
SourceIncludeMetadata, Statement, TableConstraint, UnresolvedDatabaseName, ViewDefinition,
IndexOptionName, KeyConstraint, LoadGeneratorOption, LoadGeneratorOptionName, PgConfigOption,
PgConfigOptionName, ProtobufSchema, QualifiedReplica, ReferencedSubsources, ReplicaDefinition,
ReplicaOption, ReplicaOptionName, RoleAttribute, SourceIncludeMetadata, Statement,
TableConstraint, UnresolvedDatabaseName, ViewDefinition,
};
use crate::catalog::{
CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
Expand Down Expand Up @@ -614,25 +614,6 @@ pub fn plan_create_source(
)
}

// Starting offsets are allowed out with feature flags mode, as they are a simple,
// useful way to specify where to start reading a topic.
const ALLOWED_OPTIONS: &[KafkaSourceConfigOptionName] = &[
KafkaSourceConfigOptionName::StartOffset,
KafkaSourceConfigOptionName::StartTimestamp,
KafkaSourceConfigOptionName::Topic,
];

if let Some(op) = options
.iter()
.find(|op| !ALLOWED_OPTIONS.contains(&op.name))
{
scx.require_feature_flag_w_dynamic_desc(
&vars::ENABLE_KAFKA_CONFIG_DENYLIST_OPTIONS,
format!("FROM KAFKA CONNECTION ({}...)", op.name.to_ast_string()),
format!("permitted options are {}", comma_separated(ALLOWED_OPTIONS)),
)?;
}

let KafkaSourceConfigOptionExtracted {
group_id_prefix,
topic,
Expand Down Expand Up @@ -2509,29 +2490,11 @@ fn kafka_sink_builder(
),
};

// Starting offsets are allowed with feature flags mode, as they are a simple,
// useful way to specify where to start reading a topic.
const ALLOWED_OPTIONS: &[KafkaSinkConfigOptionName] = &[
KafkaSinkConfigOptionName::Topic,
KafkaSinkConfigOptionName::CompressionType,
];

if let Some(op) = options
.iter()
.find(|op| !ALLOWED_OPTIONS.contains(&op.name))
{
scx.require_feature_flag_w_dynamic_desc(
&vars::ENABLE_KAFKA_CONFIG_DENYLIST_OPTIONS,
format!("FROM KAFKA CONNECTION ({}...)", op.name.to_ast_string()),
format!("permitted options are {}", comma_separated(ALLOWED_OPTIONS)),
)?;
}

let KafkaSinkConfigOptionExtracted {
topic,
compression_type,
// TODO: plumb group ID through to sink.
group_id_prefix: _,
progress_group_id_prefix,
transactional_id,
seen: _,
}: KafkaSinkConfigOptionExtracted = options.try_into()?;

Expand Down Expand Up @@ -2640,6 +2603,8 @@ fn kafka_sink_builder(
key_desc_and_indices,
value_desc,
compression_type,
progress_group_id_prefix,
transactional_id_prefix: transactional_id,
}))
}

Expand Down
7 changes: 0 additions & 7 deletions src/sql/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1793,13 +1793,6 @@ feature_flags!(
internal: true,
enable_for_item_parsing: true,
},
{
name: enable_kafka_config_denylist_options,
desc: "Kafka sources with non-allowlisted options",
default: false,
internal: true,
enable_for_item_parsing: true,
},
{
name: enable_list_length_max,
desc: "the list_length_max function",
Expand Down
21 changes: 10 additions & 11 deletions src/storage-client/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use mz_kafka_util::client::{
};
use mz_ore::collections::CollectionExt;
use mz_ore::task;
use mz_repr::{GlobalId, Timestamp};
use mz_repr::Timestamp;
use mz_storage_types::configuration::StorageConfiguration;
use mz_storage_types::errors::{ContextCreationError, ContextCreationErrorExt};
use mz_storage_types::sinks::KafkaSinkConnection;
Expand All @@ -32,15 +32,6 @@ use tracing::{info, warn};

use crate::sink::progress_key::ProgressKey;

/// Formatter for Kafka group.id setting
pub struct SinkGroupId;

impl SinkGroupId {
pub fn new(sink_id: GlobalId) -> String {
format!("materialize-bootstrap-sink-{sink_id}")
}
}

pub mod progress_key {
use std::fmt;

Expand Down Expand Up @@ -333,6 +324,9 @@ pub async fn build_kafka(
storage_configuration: &StorageConfiguration,
) -> Result<Option<Timestamp>, ContextCreationError> {
// Fetch the progress of the last incarnation of the sink, if any.
let group_id = connection
.progress_group_id(&storage_configuration.connection_context, sink_id)
.into_owned();
let progress_topic = connection
.progress_topic(&storage_configuration.connection_context)
.into_owned();
Expand All @@ -345,7 +339,12 @@ pub async fn build_kafka(
storage_configuration,
MzClientContext::default(),
&btreemap! {
"group.id" => SinkGroupId::new(sink_id),
// Consumer group ID, which may have been overridden by the
// user. librdkafka requires this, even though we'd prefer
// to disable the consumer group protocol entirely.
"group.id" => group_id.clone(),
// Allow Kafka monitoring tools to identify this consumer.
"client.id" => group_id.clone(),
"isolation.level" => isolation_level.into(),
"enable.auto.commit" => "false".into(),
"auto.offset.reset" => "earliest".into(),
Expand Down
2 changes: 2 additions & 0 deletions src/storage-types/src/sinks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ message ProtoKafkaSinkConnectionV2 {
google.protobuf.Empty lz4 = 17;
google.protobuf.Empty zstd = 18;
}
optional string progress_group_id_prefix = 19;
optional string transactional_id_prefix = 20;
}

message ProtoPersistSinkConnection {
Expand Down
Loading

0 comments on commit c5c51d9

Please sign in to comment.