Skip to content

Commit

Permalink
sql: RETAIN HISTORY in CREATE MATERIALIZED VIEW
Browse files Browse the repository at this point in the history
  • Loading branch information
maddyblue committed Dec 14, 2023
1 parent 633db6e commit e6f8ee0
Show file tree
Hide file tree
Showing 15 changed files with 185 additions and 9 deletions.
1 change: 1 addition & 0 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1905,6 +1905,7 @@ mod builtin_migration_tests {
resolved_ids: ResolvedIds(BTreeSet::from_iter(resolved_ids)),
cluster_id: ClusterId::User(1),
non_null_assertions: vec![],
custom_logical_compaction_window: None,
})
}
SimplifiedItem::Index { on } => {
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,7 @@ impl CatalogState {
resolved_ids,
cluster_id: materialized_view.cluster_id,
non_null_assertions: materialized_view.non_null_assertions,
custom_logical_compaction_window: materialized_view.compaction_window,
})
}
Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
Expand Down
7 changes: 6 additions & 1 deletion src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,7 @@ impl Coordinator {
column_names,
cluster_id,
non_null_assertions,
compaction_window,
},
replace: _,
drop_ids,
Expand Down Expand Up @@ -1032,6 +1033,7 @@ impl Coordinator {
resolved_ids,
cluster_id,
non_null_assertions,
custom_logical_compaction_window: compaction_window,
}),
owner_id: *session.current_role_id(),
});
Expand Down Expand Up @@ -1080,7 +1082,10 @@ impl Coordinator {
.unwrap_or_terminate("cannot fail to append");

coord
.initialize_storage_read_policies(vec![id], CompactionWindow::Default)
.initialize_storage_read_policies(
vec![id],
compaction_window.unwrap_or(CompactionWindow::Default),
)
.await;

coord.ship_dataflow(df_desc, cluster_id).await;
Expand Down
5 changes: 3 additions & 2 deletions src/catalog/src/memory/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,7 @@ pub struct MaterializedView {
pub resolved_ids: ResolvedIds,
pub cluster_id: ClusterId,
pub non_null_assertions: Vec<usize>,
pub custom_logical_compaction_window: Option<CompactionWindow>,
}

#[derive(Debug, Clone, Serialize)]
Expand Down Expand Up @@ -1069,8 +1070,8 @@ impl CatalogItem {
CatalogItem::Table(table) => table.custom_logical_compaction_window,
CatalogItem::Source(source) => source.custom_logical_compaction_window,
CatalogItem::Index(index) => index.custom_logical_compaction_window,
CatalogItem::MaterializedView(_)
| CatalogItem::Log(_)
CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::Sink(_)
| CatalogItem::Type(_)
Expand Down
78 changes: 78 additions & 0 deletions src/environmentd/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ use mz_environmentd::test_util::{
self, MzTimestamp, PostgresErrorExt, TestServerWithRuntime, KAFKA_ADDRS,
};
use mz_ore::assert_contains;
use mz_ore::collections::CollectionExt;
use mz_ore::now::{NowFn, NOW_ZERO, SYSTEM_TIME};
use mz_ore::result::ResultExt;
use mz_ore::retry::Retry;
Expand All @@ -110,6 +111,7 @@ use serde_json::json;
use timely::order::PartialOrder;
use tokio::sync::{mpsc, oneshot};
use tokio_postgres::error::{DbError, SqlState};
use tokio_postgres::Client;
use tracing::{debug, info};

/// An HTTP server whose responses can be controlled from another thread.
Expand Down Expand Up @@ -3526,3 +3528,79 @@ fn test_peek_on_dropped_indexed_view() {
})
.unwrap();
}

// Test that RETAIN HISTORY results in the since and upper being separated by the specified amount.
#[mz_ore::test(tokio::test(flavor = "multi_thread", worker_threads = 1))]
async fn test_retain_history() {
let server = test_util::TestHarness::default().start().await;
let client = server.connect().await.unwrap();
let sys_client = server
.connect()
.internal()
.user(&SYSTEM_USER.name)
.await
.unwrap();

assert!(client
.batch_execute(
"CREATE MATERIALIZED VIEW v WITH (RETAIN HISTORY = FOR '2s') AS SELECT * FROM t",
)
.await
.is_err());

sys_client
.batch_execute("ALTER SYSTEM SET enable_logical_compaction_window = true")
.await
.unwrap();

client
.batch_execute("CREATE TABLE t (a INT4)")
.await
.unwrap();
client
.batch_execute("INSERT INTO t VALUES (1)")
.await
.unwrap();

assert_contains!(
client
.batch_execute(
"CREATE MATERIALIZED VIEW v WITH (RETAIN HISTORY = FOR '-2s') AS SELECT * FROM t",
)
.await
.unwrap_err()
.to_string(),
"invalid RETAIN HISTORY"
);

client
.batch_execute(
"CREATE MATERIALIZED VIEW v WITH (RETAIN HISTORY = FOR '2s') AS SELECT * FROM t",
)
.await
.unwrap();

async fn get_ts(client: &Client) -> TimestampExplanation<Timestamp> {
let row = client
.query_one("EXPLAIN TIMESTAMP AS JSON FOR SELECT * FROM v;", &[])
.await
.unwrap();
let explain: String = row.get(0);
serde_json::from_str(&explain).unwrap()
}

Retry::default()
.retry_async(|_| async {
let ts = get_ts(&client).await;
let source = ts.sources.into_element();
let upper = source.write_frontier.into_element();
let since = source.read_frontier.into_element();
if upper.saturating_sub(since) >= Timestamp::from(2000u64) {
return Ok(());
}
tokio::time::sleep(Duration::from_secs(1)).await;
Err::<(), String>(format!("{upper} - {since} should be atleast 2s apart"))
})
.await
.unwrap();
}
2 changes: 2 additions & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ Groups
Having
Header
Headers
History
Hold
Host
Hour
Expand Down Expand Up @@ -308,6 +309,7 @@ Replication
Reset
Respect
Restrict
Retain
Return
Returning
Revoke
Expand Down
2 changes: 2 additions & 0 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ use crate::ast::{AstInfo, Expr, Ident, OrderByExpr, UnresolvedItemName, WithOpti
pub enum MaterializedViewOptionName {
/// The `ASSERT NOT NULL [=] <ident>` option.
AssertNotNull,
RetainHistory,
}

impl AstDisplay for MaterializedViewOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3129,6 +3129,7 @@ pub enum WithOptionValue<T: AstInfo> {
// Special cases.
ClusterReplicas(Vec<ReplicaDefinition<T>>),
ConnectionKafkaBroker(KafkaBroker<T>),
RetainHistoryFor(Value),
}

impl<T: AstInfo> AstDisplay for WithOptionValue<T> {
Expand All @@ -3137,7 +3138,9 @@ impl<T: AstInfo> AstDisplay for WithOptionValue<T> {
// When adding branches to this match statement, think about whether it is OK for us to collect
// the value as part of our telemetry. Check the data management policy to be sure!
match self {
WithOptionValue::Value(_) | WithOptionValue::Sequence(_) => {
WithOptionValue::Value(_)
| WithOptionValue::Sequence(_)
| WithOptionValue::RetainHistoryFor(_) => {
// These are redact-aware.
}
WithOptionValue::DataType(_)
Expand Down Expand Up @@ -3179,6 +3182,10 @@ impl<T: AstInfo> AstDisplay for WithOptionValue<T> {
WithOptionValue::ConnectionKafkaBroker(broker) => {
f.write_node(broker);
}
WithOptionValue::RetainHistoryFor(value) => {
f.write_str("FOR ");
f.write_node(value);
}
}
}
}
Expand Down
30 changes: 28 additions & 2 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3225,18 +3225,44 @@ impl<'a> Parser<'a> {
fn parse_materialized_view_option_name(
&mut self,
) -> Result<MaterializedViewOptionName, ParserError> {
self.expect_keywords(&[ASSERT, NOT, NULL])?;
Ok(MaterializedViewOptionName::AssertNotNull)
let option = self.expect_one_of_keywords(&[ASSERT, RETAIN])?;
let name = match option {
ASSERT => {
self.expect_keywords(&[NOT, NULL])?;
MaterializedViewOptionName::AssertNotNull
}
RETAIN => {
self.expect_keyword(HISTORY)?;
MaterializedViewOptionName::RetainHistory
}
_ => unreachable!(),
};
Ok(name)
}

fn parse_materialized_view_option(
&mut self,
) -> Result<MaterializedViewOption<Raw>, ParserError> {
let name = self.parse_materialized_view_option_name()?;
if name == MaterializedViewOptionName::RetainHistory {
return self.parse_materialized_view_option_retain_history();
}
let value = self.parse_optional_option_value()?;
Ok(MaterializedViewOption { name, value })
}

fn parse_materialized_view_option_retain_history(
&mut self,
) -> Result<MaterializedViewOption<Raw>, ParserError> {
let _ = self.consume_token(&Token::Eq);
self.expect_keyword(FOR)?;
let value = self.parse_value()?;
Ok(MaterializedViewOption {
name: MaterializedViewOptionName::RetainHistory,
value: Some(WithOptionValue::RetainHistoryFor(value)),
})
}

fn parse_create_index(&mut self) -> Result<Statement<Raw>, ParserError> {
let default_index = self.parse_keyword(DEFAULT);
self.expect_keyword(INDEX)?;
Expand Down
4 changes: 2 additions & 2 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,9 @@ CREATE MATERIALIZED VIEW v IN CLUSTER [1] AS SELECT 1
CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [], in_cluster: Some(Resolved("1")), query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, with_options: [] })

parse-statement roundtrip
CREATE OR REPLACE MATERIALIZED VIEW v WITH (ASSERT NOT NULL a, ASSERT NOT NULL = b) AS SELECT 1
CREATE OR REPLACE MATERIALIZED VIEW v WITH (ASSERT NOT NULL a, ASSERT NOT NULL = b, RETAIN HISTORY = FOR '1s') AS SELECT 1
----
CREATE OR REPLACE MATERIALIZED VIEW v WITH (ASSERT NOT NULL = a, ASSERT NOT NULL = b) AS SELECT 1
CREATE OR REPLACE MATERIALIZED VIEW v WITH (ASSERT NOT NULL = a, ASSERT NOT NULL = b, RETAIN HISTORY = FOR '1s') AS SELECT 1

parse-statement
CREATE CONNECTION awsconn TO AWS (ACCESS KEY ID 'id', ENDPOINT 'endpoint', REGION 'region', ROLE ARN 'role-arn', SECRET ACCESS KEY 'key', TOKEN 'token')
Expand Down
1 change: 1 addition & 0 deletions src/sql/src/names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1783,6 +1783,7 @@ impl<'a> Fold<Raw, Aug> for NameResolver<'a> {
.collect(),
),
ConnectionKafkaBroker(broker) => ConnectionKafkaBroker(self.fold_kafka_broker(broker)),
RetainHistoryFor(value) => RetainHistoryFor(self.fold_value(value)),
}
}

Expand Down
1 change: 1 addition & 0 deletions src/sql/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1432,6 +1432,7 @@ pub struct MaterializedView {
pub column_names: Vec<ColumnName>,
pub cluster_id: ClusterId,
pub non_null_assertions: Vec<usize>,
pub compaction_window: Option<CompactionWindow>,
}

#[derive(Clone, Debug)]
Expand Down
11 changes: 10 additions & 1 deletion src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2056,12 +2056,19 @@ pub fn plan_create_materialized_view(

let MaterializedViewOptionExtracted {
assert_not_null,
retain_history,
seen: _,
}: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;

if !assert_not_null.is_empty() {
scx.require_feature_flag(&crate::session::vars::ENABLE_ASSERT_NOT_NULL)?;
}
let compaction_window = retain_history
.map(|cw| {
scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
Ok::<_, PlanError>(cw.try_into()?)
})
.transpose()?;
let mut non_null_assertions = assert_not_null
.into_iter()
.map(normalize::column_name)
Expand Down Expand Up @@ -2150,6 +2157,7 @@ pub fn plan_create_materialized_view(
column_names,
cluster_id,
non_null_assertions,
compaction_window,
},
replace,
drop_ids,
Expand All @@ -2160,7 +2168,8 @@ pub fn plan_create_materialized_view(

generate_extracted_config!(
MaterializedViewOption,
(AssertNotNull, Ident, AllowMultiple)
(AssertNotNull, Ident, AllowMultiple),
(RetainHistory, Duration)
);

pub fn describe_create_sink(
Expand Down
1 change: 1 addition & 0 deletions src/sql/src/plan/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ impl<V: TryFromValue<Value>, T: AstInfo + std::fmt::Debug> TryFromValue<WithOpti
},
V::name()
),
WithOptionValue::RetainHistoryFor(v) => V::try_from_value(v),
}
}
fn name() -> String {
Expand Down
Loading

0 comments on commit e6f8ee0

Please sign in to comment.