Skip to content

Commit

Permalink
Merge pull request #30076 from rjobanp/populate-kafka-source-table-cols
Browse files Browse the repository at this point in the history
Populate envelope/key-format/value-format columns on mz_kafka_source_tables table
  • Loading branch information
rjobanp authored Oct 18, 2024
2 parents f5aa028 + 329a9f6 commit 170fa64
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 75 deletions.
3 changes: 3 additions & 0 deletions doc/user/content/sql/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,9 @@ table and the corresponding upstream Kafka topic being ingested.
| ------------------- | ---------------- | -------- |
| `id` | [`text`] | The ID of the table. Corresponds to [`mz_catalog.mz_tables.id`](../mz_catalog#mz_tables). |
| `topic` | [`text`] | The topic being ingested. |
| `envelope_type` | [`text`] | The [envelope](/sql/create-source/#envelopes) type: `none`, `upsert`, or `debezium`. `NULL` for other source types. |
| `key_format` | [`text`] | The [format](/sql/create-source/#formats) of the Kafka message key: `avro`, `protobuf`, `csv`, `regex`, `bytes`, `json`, `text`, or `NULL`. |
| `value_format` | [`text`] | The [format](/sql/create-source/#formats) of the Kafka message value: `avro`, `protobuf`, `csv`, `regex`, `bytes`, `json`, `text`. `NULL` for other source types. |

<!--
## `mz_prepared_statement_history`
Expand Down
27 changes: 23 additions & 4 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,17 @@ impl CatalogState {
"kafka" => {
mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
let topic = external_reference[0].to_ast_string();
self.pack_kafka_source_tables_update(id, &topic, diff)
let envelope = data_source.envelope();
let (key_format, value_format) = data_source.formats();

self.pack_kafka_source_tables_update(
id,
&topic,
envelope,
key_format,
value_format,
diff,
)
}
s => unreachable!("{s} sources do not have tables"),
}
Expand All @@ -551,7 +561,7 @@ impl CatalogState {
CatalogItem::Source(source) => {
let source_type = source.source_type();
let connection_id = source.connection_id();
let envelope = source.envelope();
let envelope = source.data_source.envelope();
let cluster_entry = match source.data_source {
// Ingestion exports don't have their own cluster, but
// run on their ingestion's cluster.
Expand All @@ -563,7 +573,7 @@ impl CatalogState {

let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());

let (key_format, value_format) = source.formats();
let (key_format, value_format) = source.data_source.formats();

let mut updates = self.pack_source_update(
id,
Expand Down Expand Up @@ -975,11 +985,20 @@ impl CatalogState {
&self,
id: GlobalId,
topic: &str,
envelope: Option<&str>,
key_format: Option<&str>,
value_format: Option<&str>,
diff: Diff,
) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
vec![BuiltinTableUpdate {
id: &*MZ_KAFKA_SOURCE_TABLES,
row: Row::pack_slice(&[Datum::String(&id.to_string()), Datum::String(topic)]),
row: Row::pack_slice(&[
Datum::String(&id.to_string()),
Datum::String(topic),
Datum::from(envelope),
Datum::from(key_format),
Datum::from(value_format),
]),
diff,
}]
}
Expand Down
3 changes: 3 additions & 0 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2119,6 +2119,9 @@ pub static MZ_KAFKA_SOURCE_TABLES: LazyLock<BuiltinTable> = LazyLock::new(|| Bui
desc: RelationDesc::builder()
.with_column("id", ScalarType::String.nullable(false))
.with_column("topic", ScalarType::String.nullable(false))
.with_column("envelope_type", ScalarType::String.nullable(true))
.with_column("key_format", ScalarType::String.nullable(true))
.with_column("value_format", ScalarType::String.nullable(true))
.finish(),
is_retained_metrics_object: true,
access: vec![PUBLIC_SELECT],
Expand Down
144 changes: 73 additions & 71 deletions src/catalog/src/memory/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,79 @@ pub enum DataSourceDesc {
},
}

impl DataSourceDesc {
/// The key and value formats of the data source.
pub fn formats(&self) -> (Option<&str>, Option<&str>) {
match &self {
DataSourceDesc::Ingestion { ingestion_desc, .. } => {
match &ingestion_desc.desc.primary_export.encoding.as_ref() {
Some(encoding) => match &encoding.key {
Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
None => (None, Some(encoding.value.type_())),
},
None => (None, None),
}
}
DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
Some(encoding) => match &encoding.key {
Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
None => (None, Some(encoding.value.type_())),
},
None => (None, None),
},
DataSourceDesc::Introspection(_)
| DataSourceDesc::Webhook { .. }
| DataSourceDesc::Progress => (None, None),
}
}

/// Envelope of the data source.
pub fn envelope(&self) -> Option<&str> {
// Note how "none"/"append-only" is different from `None`. Source
// sources don't have an envelope (internal logs, for example), while
// other sources have an envelope that we call the "NONE"-envelope.

fn envelope_string(envelope: &SourceEnvelope) -> &str {
match envelope {
SourceEnvelope::None(_) => "none",
SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
// NOTE(aljoscha): Should we somehow mark that this is
// using upsert internally? See note above about
// DEBEZIUM.
"debezium"
}
mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
"upsert-value-err-inline"
}
},
SourceEnvelope::CdcV2 => {
// TODO(aljoscha): Should we even report this? It's
// currently not exposed.
"materialize"
}
}
}

match self {
// NOTE(aljoscha): We could move the block for ingestions into
// `SourceEnvelope` itself, but that one feels more like an internal
// thing and adapter should own how we represent envelopes as a
// string? It would not be hard to convince me otherwise, though.
DataSourceDesc::Ingestion { ingestion_desc, .. } => Some(envelope_string(
&ingestion_desc.desc.primary_export.envelope,
)),
DataSourceDesc::IngestionExport { data_config, .. } => {
Some(envelope_string(&data_config.envelope))
}
DataSourceDesc::Introspection(_)
| DataSourceDesc::Webhook { .. }
| DataSourceDesc::Progress => None,
}
}
}

#[derive(Debug, Clone, Serialize)]
pub struct Source {
pub create_sql: Option<String>,
Expand Down Expand Up @@ -809,77 +882,6 @@ impl Source {
}
}

/// The key and value formats of the source.
pub fn formats(&self) -> (Option<&str>, Option<&str>) {
match &self.data_source {
DataSourceDesc::Ingestion { ingestion_desc, .. } => {
match &ingestion_desc.desc.primary_export.encoding.as_ref() {
Some(encoding) => match &encoding.key {
Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
None => (None, Some(encoding.value.type_())),
},
None => (None, None),
}
}
DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
Some(encoding) => match &encoding.key {
Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
None => (None, Some(encoding.value.type_())),
},
None => (None, None),
},
DataSourceDesc::Introspection(_)
| DataSourceDesc::Webhook { .. }
| DataSourceDesc::Progress => (None, None),
}
}

/// Envelope of the source.
pub fn envelope(&self) -> Option<&str> {
// Note how "none"/"append-only" is different from `None`. Source
// sources don't have an envelope (internal logs, for example), while
// other sources have an envelope that we call the "NONE"-envelope.

fn envelope_string(envelope: &SourceEnvelope) -> &str {
match envelope {
SourceEnvelope::None(_) => "none",
SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
// NOTE(aljoscha): Should we somehow mark that this is
// using upsert internally? See note above about
// DEBEZIUM.
"debezium"
}
mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
"upsert-value-err-inline"
}
},
SourceEnvelope::CdcV2 => {
// TODO(aljoscha): Should we even report this? It's
// currently not exposed.
"materialize"
}
}
}

match &self.data_source {
// NOTE(aljoscha): We could move the block for ingestions into
// `SourceEnvelope` itself, but that one feels more like an internal
// thing and adapter should own how we represent envelopes as a
// string? It would not be hard to convince me otherwise, though.
DataSourceDesc::Ingestion { ingestion_desc, .. } => Some(envelope_string(
&ingestion_desc.desc.primary_export.envelope,
)),
DataSourceDesc::IngestionExport { data_config, .. } => {
Some(envelope_string(&data_config.envelope))
}
DataSourceDesc::Introspection(_)
| DataSourceDesc::Webhook { .. }
| DataSourceDesc::Progress => None,
}
}

/// Connection ID of the source, if one exists.
pub fn connection_id(&self) -> Option<GlobalId> {
match &self.data_source {
Expand Down
3 changes: 3 additions & 0 deletions test/sqllogictest/autogenerated/mz_internal.slt
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,9 @@ SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object
----
1 id text
2 topic text
3 envelope_type text
4 key_format text
5 value_format text

query ITT
SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_sessions' ORDER BY position
Expand Down

0 comments on commit 170fa64

Please sign in to comment.