Skip to content

Commit

Permalink
Refactor purify_alter_source
Browse files Browse the repository at this point in the history
  • Loading branch information
rjobanp committed Oct 21, 2024
1 parent c2283aa commit 99fb784
Showing 1 changed file with 44 additions and 22 deletions.
66 changes: 44 additions & 22 deletions src/sql/src/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
use mz_storage_types::sources::mysql::MySqlSourceDetails;
use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
use mz_storage_types::sources::{
GenericSourceConnection, PostgresSourceConnection, SourceConnection,
GenericSourceConnection, PostgresSourceConnection, SourceConnection, SourceDesc,
SourceExportStatementDetails,
};
use prost::Message;
Expand All @@ -75,8 +75,8 @@ use crate::ast::{
use crate::catalog::{CatalogItemType, SessionCatalog};
use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
use crate::names::{
Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
ResolvedItemName,
Aug, FullItemName, PartialItemName, QualifiedItemName, ResolvedColumnReference,
ResolvedDataType, ResolvedIds, ResolvedItemName,
};
use crate::plan::error::PlanError;
use crate::plan::statement::ddl::load_generator_ast_to_generator;
Expand Down Expand Up @@ -1171,9 +1171,6 @@ async fn purify_create_source(
})
}

// TODO(database-issues#8620): Remove once subsources are removed
/// Equivalent to `purify_create_source` but for `AlterSourceStatement`.
///
/// On success, returns the details on new subsources and updated
/// 'options' that sequencing expects for handling `ALTER SOURCE` statements.
async fn purify_alter_source(
Expand Down Expand Up @@ -1220,9 +1217,47 @@ async fn purify_alter_source(
print_id: true,
version: RelationVersionSelector::Latest,
};
let connection_name = desc.connection.name();

// Validate this is a source that can be altered.
match action {
AlterSourceAction::AddSubsources {
external_references,
options,
} => {
purify_alter_source_add_subsources(
&scx,
external_references,
options,
desc,
source_name,
unresolved_source_name,
resolved_source_name,
storage_configuration,
)
.await
}
_ => Ok(PurifiedStatement::PurifiedAlterSource {
alter_source_stmt: AlterSourceStatement {
source_name: unresolved_source_name,
action,
if_exists,
},
}),
}
}

// TODO(database-issues#8620): Remove once subsources are removed
/// Equivalent to `purify_create_source` but for `AlterSourceStatement`.
async fn purify_alter_source_add_subsources(
scx: &StatementContext<'_>,
external_references: Vec<ExternalReferenceExport>,
mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
desc: SourceDesc,
source_name: &QualifiedItemName,
unresolved_source_name: UnresolvedItemName,
resolved_source_name: ResolvedItemName,
storage_configuration: &StorageConfiguration,
) -> Result<PurifiedStatement, PlanError> {
// Validate this is a source that can have subsources added.
match desc.connection {
GenericSourceConnection::Postgres(PostgresSourceConnection {
connection:
Expand All @@ -1239,20 +1274,7 @@ async fn purify_alter_source(
),
};

// If we don't need to handle added subsources, early return.
let AlterSourceAction::AddSubsources {
external_references,
mut options,
} = action
else {
return Ok(PurifiedStatement::PurifiedAlterSource {
alter_source_stmt: AlterSourceStatement {
source_name: unresolved_source_name,
action,
if_exists,
},
});
};
let connection_name = desc.connection.name();

let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
text_columns,
Expand Down

0 comments on commit 99fb784

Please sign in to comment.