Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: RETAIN HISTORY for materialized views #23788

Merged
merged 1 commit into from
Dec 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1909,6 +1909,7 @@ mod builtin_migration_tests {
resolved_ids: ResolvedIds(BTreeSet::from_iter(resolved_ids)),
cluster_id: ClusterId::User(1),
non_null_assertions: vec![],
custom_logical_compaction_window: None,
})
}
SimplifiedItem::Index { on } => {
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,7 @@ impl CatalogState {
resolved_ids,
cluster_id: materialized_view.cluster_id,
non_null_assertions: materialized_view.non_null_assertions,
custom_logical_compaction_window: materialized_view.compaction_window,
})
}
Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
Expand Down
7 changes: 6 additions & 1 deletion src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,7 @@ impl Coordinator {
column_names,
cluster_id,
non_null_assertions,
compaction_window,
},
replace: _,
drop_ids,
Expand Down Expand Up @@ -1032,6 +1033,7 @@ impl Coordinator {
resolved_ids,
cluster_id,
non_null_assertions,
custom_logical_compaction_window: compaction_window,
}),
owner_id: *session.current_role_id(),
});
Expand Down Expand Up @@ -1083,7 +1085,10 @@ impl Coordinator {
.unwrap_or_terminate("cannot fail to append");

coord
.initialize_storage_read_policies(vec![id], CompactionWindow::Default)
.initialize_storage_read_policies(
vec![id],
compaction_window.unwrap_or(CompactionWindow::Default),
)
.await;

if coord.catalog().state().system_config().enable_mz_notices() {
Expand Down
5 changes: 3 additions & 2 deletions src/catalog/src/memory/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ pub struct MaterializedView {
pub resolved_ids: ResolvedIds,
pub cluster_id: ClusterId,
pub non_null_assertions: Vec<usize>,
pub custom_logical_compaction_window: Option<CompactionWindow>,
}

#[derive(Debug, Clone, Serialize)]
Expand Down Expand Up @@ -1094,8 +1095,8 @@ impl CatalogItem {
CatalogItem::Table(table) => table.custom_logical_compaction_window,
CatalogItem::Source(source) => source.custom_logical_compaction_window,
CatalogItem::Index(index) => index.custom_logical_compaction_window,
CatalogItem::MaterializedView(_)
| CatalogItem::Log(_)
CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::Sink(_)
| CatalogItem::Type(_)
Expand Down
111 changes: 109 additions & 2 deletions src/environmentd/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ use http::StatusCode;
use itertools::Itertools;
use mz_adapter::{TimestampContext, TimestampExplanation};
use mz_environmentd::test_util::{
self, try_get_explain_timestamp, MzTimestamp, PostgresErrorExt, TestServerWithRuntime,
KAFKA_ADDRS,
self, get_explain_timestamp, get_explain_timestamp_determination, try_get_explain_timestamp,
MzTimestamp, PostgresErrorExt, TestServerWithRuntime, KAFKA_ADDRS,
};
use mz_ore::assert_contains;
use mz_ore::collections::CollectionExt;
use mz_ore::now::{NowFn, NOW_ZERO, SYSTEM_TIME};
use mz_ore::result::ResultExt;
use mz_ore::retry::Retry;
Expand Down Expand Up @@ -3588,3 +3589,109 @@ async fn test_explain_as_of() {
.await
.unwrap();
}

// Test that RETAIN HISTORY results in the since and upper being separated by the specified amount.
#[mz_ore::test(tokio::test(flavor = "multi_thread", worker_threads = 1))]
async fn test_retain_history() {
let server = test_util::TestHarness::default().start().await;
let client = server.connect().await.unwrap();
let sys_client = server
.connect()
.internal()
.user(&SYSTEM_USER.name)
.await
.unwrap();

// Must fail before flag set.
assert!(client
.batch_execute(
"CREATE MATERIALIZED VIEW v WITH (RETAIN HISTORY = FOR '2s') AS SELECT * FROM t",
)
.await
.is_err());

sys_client
.batch_execute("ALTER SYSTEM SET enable_logical_compaction_window = true")
.await
.unwrap();

client
.batch_execute("CREATE TABLE t (a INT4)")
.await
.unwrap();
client
.batch_execute("INSERT INTO t VALUES (1)")
.await
.unwrap();

assert_contains!(
client
.batch_execute(
"CREATE MATERIALIZED VIEW v WITH (RETAIN HISTORY = FOR '-2s') AS SELECT * FROM t",
)
.await
.unwrap_err()
.to_string(),
"invalid RETAIN HISTORY"
);

client
.batch_execute(
"CREATE MATERIALIZED VIEW v WITH (RETAIN HISTORY = FOR '5s') AS SELECT * FROM t",
)
.await
.unwrap();

// Test compaction and querying without an index present.
Retry::default()
.retry_async(|_| async {
let ts = get_explain_timestamp_determination("v", &client).await?;
let source = ts.sources.into_element();
let upper = source.write_frontier.into_element();
let since = source.read_frontier.into_element();
if upper.saturating_sub(since) < Timestamp::from(2000u64) {
anyhow::bail!("{upper} - {since} should be atleast 2s apart")
}
client
.query(
&format!(
"SELECT * FROM v AS OF {}-2000",
ts.determination.timestamp_context.timestamp_or_default()
),
&[],
)
.await?;
Ok(())
})
.await
.unwrap();

// With an index the AS OF query should fail because we haven't taught the planner about retain
// history yet.
client
.batch_execute("CREATE INDEX i ON v (a)")
.await
.unwrap();

let ts = get_explain_timestamp("v", &client).await;
assert_contains!(
client
.query(&format!("SELECT * FROM v AS OF {ts}-2000"), &[])
.await
.unwrap_err()
.to_string(),
"not valid for all inputs"
);

// Make sure we didn't fail just because the index didn't have enough time after creation.
tokio::time::sleep(Duration::from_secs(3)).await;
let ts = get_explain_timestamp("v", &client).await;
assert_contains!(
client
.query(&format!("SELECT * FROM v AS OF {ts}-2000"), &[])
.await
.unwrap_err()
.to_string(),
"not valid for all inputs"
);
}
2 changes: 2 additions & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ Groups
Having
Header
Headers
History
Hold
Host
Hour
Expand Down Expand Up @@ -309,6 +310,7 @@ Replication
Reset
Respect
Restrict
Retain
Return
Returning
Revoke
Expand Down
2 changes: 2 additions & 0 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ use crate::ast::{AstInfo, Expr, Ident, OrderByExpr, UnresolvedItemName, WithOpti
pub enum MaterializedViewOptionName {
/// The `ASSERT NOT NULL [=] <ident>` option.
AssertNotNull,
RetainHistory,
}

impl AstDisplay for MaterializedViewOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3129,6 +3129,7 @@ pub enum WithOptionValue<T: AstInfo> {
// Special cases.
ClusterReplicas(Vec<ReplicaDefinition<T>>),
ConnectionKafkaBroker(KafkaBroker<T>),
RetainHistoryFor(Value),
}

impl<T: AstInfo> AstDisplay for WithOptionValue<T> {
Expand All @@ -3137,7 +3138,9 @@ impl<T: AstInfo> AstDisplay for WithOptionValue<T> {
// When adding branches to this match statement, think about whether it is OK for us to collect
// the value as part of our telemetry. Check the data management policy to be sure!
match self {
WithOptionValue::Value(_) | WithOptionValue::Sequence(_) => {
WithOptionValue::Value(_)
| WithOptionValue::Sequence(_)
| WithOptionValue::RetainHistoryFor(_) => {
// These are redact-aware.
}
WithOptionValue::DataType(_)
Expand Down Expand Up @@ -3179,6 +3182,10 @@ impl<T: AstInfo> AstDisplay for WithOptionValue<T> {
WithOptionValue::ConnectionKafkaBroker(broker) => {
f.write_node(broker);
}
WithOptionValue::RetainHistoryFor(value) => {
f.write_str("FOR ");
f.write_node(value);
}
}
}
}
Expand Down
30 changes: 28 additions & 2 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3231,18 +3231,44 @@ impl<'a> Parser<'a> {
fn parse_materialized_view_option_name(
&mut self,
) -> Result<MaterializedViewOptionName, ParserError> {
self.expect_keywords(&[ASSERT, NOT, NULL])?;
Ok(MaterializedViewOptionName::AssertNotNull)
let option = self.expect_one_of_keywords(&[ASSERT, RETAIN])?;
let name = match option {
ASSERT => {
self.expect_keywords(&[NOT, NULL])?;
MaterializedViewOptionName::AssertNotNull
}
RETAIN => {
self.expect_keyword(HISTORY)?;
MaterializedViewOptionName::RetainHistory
}
_ => unreachable!(),
};
Ok(name)
}

fn parse_materialized_view_option(
&mut self,
) -> Result<MaterializedViewOption<Raw>, ParserError> {
let name = self.parse_materialized_view_option_name()?;
if name == MaterializedViewOptionName::RetainHistory {
return self.parse_materialized_view_option_retain_history();
}
let value = self.parse_optional_option_value()?;
Ok(MaterializedViewOption { name, value })
}

fn parse_materialized_view_option_retain_history(
&mut self,
) -> Result<MaterializedViewOption<Raw>, ParserError> {
let _ = self.consume_token(&Token::Eq);
self.expect_keyword(FOR)?;
let value = self.parse_value()?;
Ok(MaterializedViewOption {
name: MaterializedViewOptionName::RetainHistory,
value: Some(WithOptionValue::RetainHistoryFor(value)),
})
}

fn parse_create_index(&mut self) -> Result<Statement<Raw>, ParserError> {
let default_index = self.parse_keyword(DEFAULT);
self.expect_keyword(INDEX)?;
Expand Down
4 changes: 2 additions & 2 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,9 @@ CREATE MATERIALIZED VIEW v IN CLUSTER [1] AS SELECT 1
CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [], in_cluster: Some(Resolved("1")), query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, with_options: [] })

parse-statement roundtrip
CREATE OR REPLACE MATERIALIZED VIEW v WITH (ASSERT NOT NULL a, ASSERT NOT NULL = b) AS SELECT 1
CREATE OR REPLACE MATERIALIZED VIEW v WITH (ASSERT NOT NULL a, ASSERT NOT NULL = b, RETAIN HISTORY = FOR '1s') AS SELECT 1
----
CREATE OR REPLACE MATERIALIZED VIEW v WITH (ASSERT NOT NULL = a, ASSERT NOT NULL = b) AS SELECT 1
CREATE OR REPLACE MATERIALIZED VIEW v WITH (ASSERT NOT NULL = a, ASSERT NOT NULL = b, RETAIN HISTORY = FOR '1s') AS SELECT 1

parse-statement
CREATE CONNECTION awsconn TO AWS (ACCESS KEY ID 'id', ENDPOINT 'endpoint', REGION 'region', SECRET ACCESS KEY 'key', TOKEN 'token')
Expand Down
1 change: 1 addition & 0 deletions src/sql/src/names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1783,6 +1783,7 @@ impl<'a> Fold<Raw, Aug> for NameResolver<'a> {
.collect(),
),
ConnectionKafkaBroker(broker) => ConnectionKafkaBroker(self.fold_kafka_broker(broker)),
RetainHistoryFor(value) => RetainHistoryFor(self.fold_value(value)),
}
}

Expand Down
1 change: 1 addition & 0 deletions src/sql/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1435,6 +1435,7 @@ pub struct MaterializedView {
pub column_names: Vec<ColumnName>,
pub cluster_id: ClusterId,
pub non_null_assertions: Vec<usize>,
pub compaction_window: Option<CompactionWindow>,
}

#[derive(Clone, Debug)]
Expand Down
11 changes: 10 additions & 1 deletion src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2056,12 +2056,19 @@ pub fn plan_create_materialized_view(

let MaterializedViewOptionExtracted {
assert_not_null,
retain_history,
seen: _,
}: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;

if !assert_not_null.is_empty() {
scx.require_feature_flag(&crate::session::vars::ENABLE_ASSERT_NOT_NULL)?;
}
let compaction_window = retain_history
.map(|cw| {
scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
Ok::<_, PlanError>(cw.try_into()?)
})
.transpose()?;
let mut non_null_assertions = assert_not_null
.into_iter()
.map(normalize::column_name)
Expand Down Expand Up @@ -2150,6 +2157,7 @@ pub fn plan_create_materialized_view(
column_names,
cluster_id,
non_null_assertions,
compaction_window,
},
replace,
drop_ids,
Expand All @@ -2160,7 +2168,8 @@ pub fn plan_create_materialized_view(

generate_extracted_config!(
MaterializedViewOption,
(AssertNotNull, Ident, AllowMultiple)
(AssertNotNull, Ident, AllowMultiple),
(RetainHistory, Duration)
);

pub fn describe_create_sink(
Expand Down
1 change: 1 addition & 0 deletions src/sql/src/plan/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ impl<V: TryFromValue<Value>, T: AstInfo + std::fmt::Debug> TryFromValue<WithOpti
},
V::name()
),
WithOptionValue::RetainHistoryFor(v) => V::try_from_value(v),
}
}
fn name() -> String {
Expand Down
Loading