Skip to content

Commit

Permalink
storage/adapter: Introduce 'ALTER SOURCE .. REFRESH REFERENCES' state…
Browse files Browse the repository at this point in the history
…ment for refreshing available upstream refS
  • Loading branch information
rjobanp committed Oct 21, 2024
1 parent 99fb784 commit 518a131
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 10 deletions.
12 changes: 11 additions & 1 deletion src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,15 @@ impl Coordinator {
)
.await
}
PurifiedStatement::PurifiedAlterSourceRefreshReferences {
source_name,
available_source_references,
} => self.plan_purified_alter_source_refresh_references(
ctx.session(),
params,
source_name,
available_source_references,
),
o @ (PurifiedStatement::PurifiedAlterSource { .. }
| PurifiedStatement::PurifiedCreateSink(..)
| PurifiedStatement::PurifiedCreateTableFromSource { .. }) => {
Expand All @@ -544,7 +553,8 @@ impl Coordinator {
}
PurifiedStatement::PurifiedCreateSink(stmt) => Statement::CreateSink(stmt),
PurifiedStatement::PurifiedCreateSource { .. }
| PurifiedStatement::PurifiedAlterSourceAddSubsources { .. } => {
| PurifiedStatement::PurifiedAlterSourceAddSubsources { .. }
| PurifiedStatement::PurifiedAlterSourceRefreshReferences { .. } => {
unreachable!("not part of exterior match stmt")
}
};
Expand Down
32 changes: 32 additions & 0 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,28 @@ impl Coordinator {
))
}

/// Prepares an `ALTER SOURCE...REFRESH REFERENCES`.
pub(crate) fn plan_purified_alter_source_refresh_references(
&self,
_session: &Session,
_params: Params,
source_name: ResolvedItemName,
available_source_references: plan::SourceReferences,
) -> Result<(Plan, ResolvedIds), AdapterError> {
let source_id = *source_name.item_id();
let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
references: available_source_references,
};

Ok((
Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
id: source_id,
action,
}),
ResolvedIds(BTreeSet::new()),
))
}

/// Prepares a `CREATE SOURCE` statement to create its progress subsource,
/// the primary source, and any ingestion export subsources (e.g. PG
/// tables).
Expand Down Expand Up @@ -3907,6 +3929,16 @@ impl Coordinator {
)
.await;
}
plan::AlterSourceAction::RefreshReferences { references } => {
self.catalog_transact(
Some(session),
vec![catalog::Op::UpdateSourceReferences {
source_id: id,
references: references.into(),
}],
)
.await?;
}
}

Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
Expand Down
4 changes: 4 additions & 0 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2701,6 +2701,7 @@ pub enum AlterSourceAction<T: AstInfo> {
cascade: bool,
names: Vec<UnresolvedItemName>,
},
RefreshReferences,
}

impl<T: AstInfo> AstDisplay for AlterSourceAction<T> {
Expand Down Expand Up @@ -2749,6 +2750,9 @@ impl<T: AstInfo> AstDisplay for AlterSourceAction<T> {
f.write_str(")");
}
}
AlterSourceAction::RefreshReferences => {
f.write_str("REFRESH REFERENCES");
}
}
}
}
Expand Down
11 changes: 10 additions & 1 deletion src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5161,7 +5161,7 @@ impl<'a> Parser<'a> {

Ok(
match self
.expect_one_of_keywords(&[ADD, DROP, RESET, SET, RENAME, OWNER])
.expect_one_of_keywords(&[ADD, DROP, RESET, SET, RENAME, OWNER, REFRESH])
.map_no_statement_parser_err()?
{
ADD => {
Expand Down Expand Up @@ -5287,6 +5287,15 @@ impl<'a> Parser<'a> {
new_owner,
})
}
REFRESH => {
self.expect_keyword(REFERENCES)
.map_parser_err(StatementKind::AlterSource)?;
Statement::AlterSource(AlterSourceStatement {
source_name,
if_exists,
action: AlterSourceAction::RefreshReferences,
})
}
_ => unreachable!(),
},
)
Expand Down
21 changes: 21 additions & 0 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -3187,6 +3187,27 @@ error: Expected identifier, found star
ALTER SOURCE * SET CLUSTER
^

parse-statement
ALTER SOURCE src REFRESH REFERENCES db.clsname
----
error: Expected end of statement, found identifier "db"
ALTER SOURCE src REFRESH REFERENCES db.clsname
^

parse-statement
ALTER SOURCE db.src REFRESH REFERENCES
----
ALTER SOURCE db.src REFRESH REFERENCES
=>
AlterSource(AlterSourceStatement { source_name: UnresolvedItemName([Ident("db"), Ident("src")]), if_exists: false, action: RefreshReferences })

parse-statement
ALTER SOURCE IF EXISTS src REFRESH REFERENCES
----
ALTER SOURCE IF EXISTS src REFRESH REFERENCES
=>
AlterSource(AlterSourceStatement { source_name: UnresolvedItemName([Ident("src")]), if_exists: true, action: RefreshReferences })

parse-statement
ALTER SINK snk SET CLUSTER clsname
----
Expand Down
3 changes: 3 additions & 0 deletions src/sql/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,9 @@ pub enum AlterSourceAction {
subsources: Vec<CreateSourcePlanBundle>,
options: Vec<AlterSourceAddSubsourceOption<Aug>>,
},
RefreshReferences {
references: SourceReferences,
},
}

#[derive(Debug)]
Expand Down
3 changes: 3 additions & 0 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6567,6 +6567,9 @@ pub fn plan_alter_source(
AlterSourceAction::AddSubsources { .. } => {
unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
}
AlterSourceAction::RefreshReferences => {
unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
}
};
}

Expand Down
102 changes: 94 additions & 8 deletions src/sql/src/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ use crate::ast::{
use crate::catalog::{CatalogItemType, SessionCatalog};
use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
use crate::names::{
Aug, FullItemName, PartialItemName, QualifiedItemName, ResolvedColumnReference,
ResolvedDataType, ResolvedIds, ResolvedItemName,
Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
ResolvedItemName,
};
use crate::plan::error::PlanError;
use crate::plan::statement::ddl::load_generator_ast_to_generator;
Expand Down Expand Up @@ -212,6 +212,11 @@ pub enum PurifiedStatement {
// Map of subsource names to external details
subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
},
PurifiedAlterSourceRefreshReferences {
source_name: ResolvedItemName,
/// The updated available upstream references for the primary source.
available_source_references: SourceReferences,
},
PurifiedCreateSink(CreateSinkStatement<Aug>),
PurifiedCreateTableFromSource {
stmt: CreateTableFromSourceStatement<Aug>,
Expand Down Expand Up @@ -1218,23 +1223,32 @@ async fn purify_alter_source(
version: RelationVersionSelector::Latest,
};

let partial_name = scx.catalog.minimal_qualification(source_name);

match action {
AlterSourceAction::AddSubsources {
external_references,
options,
} => {
purify_alter_source_add_subsources(
&scx,
external_references,
options,
desc,
source_name,
partial_name,
unresolved_source_name,
resolved_source_name,
storage_configuration,
)
.await
}
AlterSourceAction::RefreshReferences => {
purify_alter_source_refresh_references(
desc,
resolved_source_name,
storage_configuration,
)
.await
}
_ => Ok(PurifiedStatement::PurifiedAlterSource {
alter_source_stmt: AlterSourceStatement {
source_name: unresolved_source_name,
Expand All @@ -1248,11 +1262,10 @@ async fn purify_alter_source(
// TODO(database-issues#8620): Remove once subsources are removed
/// Equivalent to `purify_create_source` but for `AlterSourceStatement`.
async fn purify_alter_source_add_subsources(
scx: &StatementContext<'_>,
external_references: Vec<ExternalReferenceExport>,
mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
desc: SourceDesc,
source_name: &QualifiedItemName,
partial_source_name: PartialItemName,
unresolved_source_name: UnresolvedItemName,
resolved_source_name: ResolvedItemName,
storage_configuration: &StorageConfiguration,
Expand All @@ -1270,7 +1283,7 @@ async fn purify_alter_source_add_subsources(
GenericSourceConnection::MySql(_) => {}
_ => sql_bail!(
"source {} does not support ALTER SOURCE.",
scx.catalog.minimal_qualification(source_name),
partial_source_name
),
};

Expand Down Expand Up @@ -1317,7 +1330,7 @@ async fn purify_alter_source_add_subsources(
if !exclude_columns.is_empty() {
sql_bail!(
"{} is a {} source, which does not support EXCLUDE COLUMNS.",
scx.catalog.minimal_qualification(source_name),
partial_source_name,
connection_name
)
}
Expand Down Expand Up @@ -1424,6 +1437,79 @@ async fn purify_alter_source_add_subsources(
})
}

async fn purify_alter_source_refresh_references(
desc: SourceDesc,
resolved_source_name: ResolvedItemName,
storage_configuration: &StorageConfiguration,
) -> Result<PurifiedStatement, PlanError> {
let retrieved_source_references = match desc.connection {
GenericSourceConnection::Postgres(pg_source_connection) => {
// Get PostgresConnection for generating subsources.
let pg_connection = &pg_source_connection.connection;

let config = pg_connection
.config(
&storage_configuration.connection_context.secrets_reader,
storage_configuration,
InTask::No,
)
.await?;

let client = config
.connect(
"postgres_purification",
&storage_configuration.connection_context.ssh_tunnel_manager,
)
.await?;
let reference_client = SourceReferenceClient::Postgres {
client: &client,
publication: &pg_source_connection.publication,
database: &pg_connection.database,
};
reference_client.get_source_references().await?
}
GenericSourceConnection::MySql(mysql_source_connection) => {
let mysql_connection = &mysql_source_connection.connection;
let config = mysql_connection
.config(
&storage_configuration.connection_context.secrets_reader,
storage_configuration,
InTask::No,
)
.await?;

let mut conn = config
.connect(
"mysql purification",
&storage_configuration.connection_context.ssh_tunnel_manager,
)
.await?;

let reference_client = SourceReferenceClient::MySql {
conn: &mut conn,
include_system_schemas: false,
};
reference_client.get_source_references().await?
}
GenericSourceConnection::LoadGenerator(load_gen_connection) => {
let reference_client = SourceReferenceClient::LoadGenerator {
generator: &load_gen_connection.load_generator,
};
reference_client.get_source_references().await?
}
GenericSourceConnection::Kafka(kafka_conn) => {
let reference_client = SourceReferenceClient::Kafka {
topic: &kafka_conn.topic,
};
reference_client.get_source_references().await?
}
};
Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
source_name: resolved_source_name,
available_source_references: retrieved_source_references.available_source_references(),
})
}

async fn purify_create_table_from_source(
catalog: impl SessionCatalog,
mut stmt: CreateTableFromSourceStatement<Aug>,
Expand Down
30 changes: 30 additions & 0 deletions test/testdrive/source-tables.td
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,42 @@ var1 5678 <null>
var0 5555 6666
var1 4444 12

# Validate that the available source references table reflects the state of the upstream
# when the create source was run
> SELECT u.name, u.namespace, u.columns
FROM mz_sources s
JOIN mz_internal.mz_source_references u ON s.id = u.source_id
WHERE
s.name = 'mysql_source'
ORDER BY u.name;
mysql_table public {a,b}

# Create a new table in MySQL and refresh to see if the new available reference is picked up
$ mysql-execute name=mysql
USE public;
CREATE TABLE mysql_table_new (foo INTEGER, bar INTEGER);

> ALTER SOURCE mysql_source REFRESH REFERENCES;

> SELECT u.name, u.namespace, u.columns
FROM mz_sources s
JOIN mz_internal.mz_source_references u ON s.id = u.source_id
WHERE
s.name = 'mysql_source'
ORDER BY u.name;
mysql_table public {a,b,c}
mysql_table_new public {foo,bar}


> DROP SOURCE mysql_source CASCADE;

# ensure that CASCADE propagates to the tables
! SELECT * FROM mysql_table_3;
contains:unknown catalog item 'mysql_table_3'

> SELECT count(*) FROM mz_internal.mz_source_references;
0

# TODO(def-) Remove when database-issues#8515 and database-issues#8526 are fixed
$ skip-end

Expand Down

0 comments on commit 518a131

Please sign in to comment.