Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/adapter: Add source-id column to mz_catalog.mz_tables and add tables-from-sources to mz_internal source-reference tables #29374

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/user/content/sql/system-catalog/mz_catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ Field | Type | Meaning
`privileges` | [`mz_aclitem array`] | The privileges belonging to the table.
`create_sql` | [`text`] | The `CREATE` SQL statement for the table.
`redacted_create_sql` | [`text`] | The redacted `CREATE` SQL statement for the table.
`source_id` | [`text`] | The ID of the source associated with the table, if any. Corresponds to [`mz_sources.id`](/sql/system-catalog/mz_catalog/#mz_sources).

### `mz_timezone_abbreviations`

Expand Down
238 changes: 155 additions & 83 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,63 +476,20 @@ impl CatalogState {
let owner_id = entry.owner_id();
let privileges_row = self.pack_privilege_array_row(entry.privileges());
let privileges = privileges_row.unpack_first();
let mut updates =
match entry.item() {
Comment on lines -479 to -480
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this diff is big since it the indentation wasn't correctly formatted and now it's outdented the whole match statement, but the only update I made was within the CatalogItem::Table(table) case of the match

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppressing whitespace works perfectly here (https://github.com/MaterializeInc/materialize/pull/29374/files?w=1)—thanks for the callout!

CatalogItem::Log(_) => self.pack_source_update(
id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
privileges, diff, None,
),
CatalogItem::Index(index) => {
self.pack_index_update(id, oid, name, owner_id, index, diff)
}
CatalogItem::Table(table) => self
.pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table),
CatalogItem::Source(source) => {
let source_type = source.source_type();
let connection_id = source.connection_id();
let envelope = source.envelope();
let cluster_entry = match source.data_source {
// Ingestion exports don't have their own cluster, but
// run on their ingestion's cluster.
DataSourceDesc::IngestionExport { ingestion_id, .. } => {
self.get_entry(&ingestion_id)
}
_ => entry,
};

let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());

let (key_format, value_format) = source.formats();
let mut updates = match entry.item() {
CatalogItem::Log(_) => self.pack_source_update(
id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
privileges, diff, None,
),
CatalogItem::Index(index) => {
self.pack_index_update(id, oid, name, owner_id, index, diff)
}
CatalogItem::Table(table) => {
let mut updates = self
.pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);

let mut updates = self.pack_source_update(
id,
oid,
schema_id,
name,
source_type,
connection_id,
envelope,
key_format,
value_format,
cluster_id.as_deref(),
owner_id,
privileges,
diff,
source.create_sql.as_ref(),
);

updates.extend(match &source.data_source {
DataSourceDesc::Ingestion { ingestion_desc, .. } => {
match &ingestion_desc.desc.connection {
GenericSourceConnection::Postgres(postgres) => {
self.pack_postgres_source_update(id, postgres, diff)
}
GenericSourceConnection::Kafka(kafka) => {
self.pack_kafka_source_update(id, kafka, diff)
}
_ => vec![],
}
}
if let TableDataSource::DataSource(data_source) = &table.data_source {
updates.extend(match data_source {
DataSourceDesc::IngestionExport {
ingestion_id,
external_reference: UnresolvedItemName(external_reference),
Expand Down Expand Up @@ -577,38 +534,139 @@ impl CatalogState {
// Load generator sources don't have any special
// updates.
"load-generator" => vec![],
s => unreachable!("{s} sources do not have subsources"),
// TODO(roshan): Add support for kafka source tables.
s => unreachable!("{s} sources do not have tables"),
}
}
DataSourceDesc::Webhook { .. } => {
vec![self.pack_webhook_source_update(id, diff)]
}
_ => vec![],
});

updates
}
CatalogItem::View(view) => self
.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff),
CatalogItem::MaterializedView(mview) => self.pack_materialized_view_update(
id, oid, schema_id, name, owner_id, privileges, mview, diff,
),
CatalogItem::Sink(sink) => {
self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
}
CatalogItem::Type(ty) => {
self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
}
CatalogItem::Func(func) => {
self.pack_func_update(id, schema_id, name, owner_id, func, diff)
}
CatalogItem::Secret(_) => {
self.pack_secret_update(id, oid, schema_id, name, owner_id, privileges, diff)
}
CatalogItem::Connection(connection) => self.pack_connection_update(
id, oid, schema_id, name, owner_id, privileges, connection, diff,
),
};

updates
}
CatalogItem::Source(source) => {
let source_type = source.source_type();
let connection_id = source.connection_id();
let envelope = source.envelope();
let cluster_entry = match source.data_source {
// Ingestion exports don't have their own cluster, but
// run on their ingestion's cluster.
DataSourceDesc::IngestionExport { ingestion_id, .. } => {
self.get_entry(&ingestion_id)
}
_ => entry,
};

let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());

let (key_format, value_format) = source.formats();

let mut updates = self.pack_source_update(
id,
oid,
schema_id,
name,
source_type,
connection_id,
envelope,
key_format,
value_format,
cluster_id.as_deref(),
owner_id,
privileges,
diff,
source.create_sql.as_ref(),
);

updates.extend(match &source.data_source {
DataSourceDesc::Ingestion { ingestion_desc, .. } => {
match &ingestion_desc.desc.connection {
GenericSourceConnection::Postgres(postgres) => {
self.pack_postgres_source_update(id, postgres, diff)
}
GenericSourceConnection::Kafka(kafka) => {
self.pack_kafka_source_update(id, kafka, diff)
}
_ => vec![],
}
}
DataSourceDesc::IngestionExport {
ingestion_id,
external_reference: UnresolvedItemName(external_reference),
details: _,
} => {
let ingestion_entry = self
.get_entry(ingestion_id)
.source_desc()
.expect("primary source exists")
.expect("primary source is a source");

match ingestion_entry.connection.name() {
"postgres" => {
mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
// The left-most qualification of Postgres
// tables is the database, but this
// information is redundant because each
// Postgres connection connects to only one
// database.
let schema_name = external_reference[1].to_ast_string();
let table_name = external_reference[2].to_ast_string();

self.pack_postgres_source_tables_update(
id,
&schema_name,
&table_name,
diff,
)
}
"mysql" => {
mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
let schema_name = external_reference[0].to_ast_string();
let table_name = external_reference[1].to_ast_string();

self.pack_mysql_source_tables_update(
id,
&schema_name,
&table_name,
diff,
)
}
// Load generator sources don't have any special
// updates.
"load-generator" => vec![],
s => unreachable!("{s} sources do not have subsources"),
}
}
DataSourceDesc::Webhook { .. } => {
vec![self.pack_webhook_source_update(id, diff)]
}
_ => vec![],
});

updates
}
CatalogItem::View(view) => {
self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
}
CatalogItem::MaterializedView(mview) => self.pack_materialized_view_update(
id, oid, schema_id, name, owner_id, privileges, mview, diff,
),
CatalogItem::Sink(sink) => {
self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
}
CatalogItem::Type(ty) => {
self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
}
CatalogItem::Func(func) => {
self.pack_func_update(id, schema_id, name, owner_id, func, diff)
}
CatalogItem::Secret(_) => {
self.pack_secret_update(id, oid, schema_id, name, owner_id, privileges, diff)
}
CatalogItem::Connection(connection) => self.pack_connection_update(
id, oid, schema_id, name, owner_id, privileges, connection, diff,
),
};

if !entry.item().is_temporary() {
// Populate or clean up the `mz_object_dependencies` table.
Expand Down Expand Up @@ -739,6 +797,15 @@ impl CatalogState {
.ast
.to_ast_string_redacted()
});
let source_id = if let TableDataSource::DataSource(DataSourceDesc::IngestionExport {
ingestion_id,
..
}) = &table.data_source
{
Some(ingestion_id.to_string())
} else {
None
};

vec![BuiltinTableUpdate {
id: &*MZ_TABLES,
Expand All @@ -759,6 +826,11 @@ impl CatalogState {
} else {
Datum::Null
},
if let Some(source_id) = source_id.as_ref() {
Datum::String(source_id)
} else {
Datum::Null
},
]),
diff,
}]
Expand Down
1 change: 1 addition & 0 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2220,6 +2220,7 @@ pub static MZ_TABLES: LazyLock<BuiltinTable> = LazyLock::new(|| BuiltinTable {
)
.with_column("create_sql", ScalarType::String.nullable(true))
.with_column("redacted_create_sql", ScalarType::String.nullable(true))
.with_column("source_id", ScalarType::String.nullable(true))
.with_key(vec![0])
.with_key(vec![1])
.finish(),
Expand Down
1 change: 1 addition & 0 deletions test/sqllogictest/autogenerated/mz_catalog.slt
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ SELECT position, name, type FROM objects WHERE schema = 'mz_catalog' AND object
6 privileges mz_aclitem[]
7 create_sql text
8 redacted_create_sql text
9 source_id text

query ITT
SELECT position, name, type FROM objects WHERE schema = 'mz_catalog' AND object = 'mz_timezone_abbreviations' ORDER BY position
Expand Down
1 change: 1 addition & 0 deletions test/sqllogictest/mz_catalog_server_index_accounting.slt
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ mz_tables owner_id
mz_tables privileges
mz_tables redacted_create_sql
mz_tables schema_id
mz_tables source_id
mz_type_pg_metadata id
mz_type_pg_metadata typinput
mz_type_pg_metadata typreceive
Expand Down
Loading