From 196b715e43c8a20881b5bbf0c37b2a11681f6e7c Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 7 Apr 2024 20:42:44 +0200 Subject: [PATCH 1/3] advance in the post commit --- crates/core/src/operations/constraints.rs | 9 ++- crates/core/src/operations/delete.rs | 25 +++--- .../core/src/operations/drop_constraints.rs | 9 ++- crates/core/src/operations/merge/mod.rs | 25 +++--- crates/core/src/operations/transaction/mod.rs | 79 +++++++++++-------- crates/core/src/operations/update.rs | 29 +++---- crates/core/src/operations/write.rs | 5 +- 7 files changed, 87 insertions(+), 94 deletions(-) diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index 91539ef1a6..9cc9672dfe 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -198,9 +198,12 @@ impl std::future::IntoFuture for ConstraintBuilder { .build(Some(&this.snapshot), this.log_store.clone(), operation)? .await?; - this.snapshot - .merge(commit.data.actions, &commit.data.operation, commit.version)?; - Ok(DeltaTable::new_with_state(this.log_store, this.snapshot)) + let table = if let Some(new_snapshot) = commit.snapshot() { + DeltaTable::new_with_state(this.log_store, new_snapshot) + } else { + DeltaTable::new_with_state(this.log_store, this.snapshot) + }; + Ok(table) }) } } diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 8d13d51b4e..980dd92e60 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -190,7 +190,7 @@ async fn execute( state: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, -) -> DeltaResult<((Vec, i64, Option), DeleteMetrics)> { +) -> DeltaResult<(Option, DeleteMetrics)> { let exec_start = Instant::now(); let mut metrics = DeleteMetrics::default(); @@ -258,21 +258,14 @@ async fn execute( predicate: Some(fmt_expr_to_sql(&predicate)?), }; if actions.is_empty() { - return Ok(((actions, snapshot.version(), None), metrics)); + return Ok((None, metrics)); } let commit = CommitBuilder::from(commit_properties) .with_actions(actions) .build(Some(snapshot), log_store, operation)? .await?; - Ok(( - ( - commit.data.actions, - commit.version, - Some(commit.data.operation), - ), - metrics, - )) + Ok((commit.snapshot(), metrics)) } impl std::future::IntoFuture for DeleteBuilder { @@ -305,7 +298,7 @@ impl std::future::IntoFuture for DeleteBuilder { None => None, }; - let ((actions, version, operation), metrics) = execute( + let (new_snapshot, metrics) = execute( predicate, this.log_store.clone(), &this.snapshot, @@ -315,11 +308,11 @@ impl std::future::IntoFuture for DeleteBuilder { ) .await?; - if let Some(op) = &operation { - this.snapshot.merge(actions, op, version)?; - } - - let table = DeltaTable::new_with_state(this.log_store, this.snapshot); + let table = if let Some(new_snapshot) = new_snapshot { + DeltaTable::new_with_state(this.log_store, new_snapshot) + } else { + DeltaTable::new_with_state(this.log_store, this.snapshot) + }; Ok((table, metrics)) }) } diff --git a/crates/core/src/operations/drop_constraints.rs b/crates/core/src/operations/drop_constraints.rs index 081262753e..4f0e82bfb5 100644 --- a/crates/core/src/operations/drop_constraints.rs +++ b/crates/core/src/operations/drop_constraints.rs @@ -91,9 +91,12 @@ impl std::future::IntoFuture for DropConstraintBuilder { .build(Some(&this.snapshot), this.log_store.clone(), operation)? .await?; - this.snapshot - .merge(commit.data.actions, &commit.data.operation, commit.version)?; - Ok(DeltaTable::new_with_state(this.log_store, this.snapshot)) + let table = if let Some(new_snapshot) = commit.snapshot() { + DeltaTable::new_with_state(this.log_store, new_snapshot) + } else { + DeltaTable::new_with_state(this.log_store, this.snapshot) + }; + Ok(table) }) } } diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 44c3b499c1..f49a96a1f3 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -942,7 +942,7 @@ async fn execute( match_operations: Vec, not_match_target_operations: Vec, not_match_source_operations: Vec, -) -> DeltaResult<((Vec, i64, Option), MergeMetrics)> { +) -> DeltaResult<(Option, MergeMetrics)> { let mut metrics = MergeMetrics::default(); let exec_start = Instant::now(); @@ -1449,21 +1449,14 @@ async fn execute( }; if actions.is_empty() { - return Ok(((actions, snapshot.version(), None), metrics)); + return Ok((None, metrics)); } let commit = CommitBuilder::from(commit_properties) .with_actions(actions) .build(Some(snapshot), log_store.clone(), operation)? .await?; - Ok(( - ( - commit.data.actions, - commit.version, - Some(commit.data.operation), - ), - metrics, - )) + Ok((commit.snapshot(), metrics)) } fn remove_table_alias(expr: Expr, table_alias: &str) -> Expr { @@ -1521,7 +1514,7 @@ impl std::future::IntoFuture for MergeBuilder { session.state() }); - let ((actions, version, operation), metrics) = execute( + let (new_snapshot, metrics) = execute( this.predicate, this.source, this.log_store.clone(), @@ -1538,11 +1531,11 @@ impl std::future::IntoFuture for MergeBuilder { ) .await?; - if let Some(op) = &operation { - this.snapshot.merge(actions, op, version)?; - } - let table = DeltaTable::new_with_state(this.log_store, this.snapshot); - + let table = if let Some(new_snapshot) = new_snapshot { + DeltaTable::new_with_state(this.log_store, new_snapshot) + } else { + DeltaTable::new_with_state(this.log_store, this.snapshot) + }; Ok((table, metrics)) }) } diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 6d5f7f731a..4057ce362b 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -595,36 +595,45 @@ pub struct PostCommit<'a> { impl<'a> PostCommit<'a> { /// Runs the post commit activities - async fn run_post_commit_hook( - &self, - version: i64, - commit_data: &CommitData, - ) -> DeltaResult<()> { - if self.create_checkpoint { - self.create_checkpoint(&self.table_data, &self.log_store, version, commit_data) - .await? + async fn run_post_commit_hook(&self) -> DeltaResult> { + if let Some(table) = self.table_data { + if let Some(mut snapshot) = table.eager_snapshot().cloned() { + if self.version - snapshot.version() > 1 { + // This may only occur during concurrent write actions. We need to update the state first to - 1 + // then we can advance. + snapshot + .update(self.log_store.clone(), Some(self.version - 1)) + .await?; + snapshot.advance(vec![&self.data])?; + } else { + snapshot.advance(vec![&self.data])?; + } + let state = DeltaTableState { + app_transaction_version: HashMap::new(), + snapshot, + }; + // Execute each hook + if self.create_checkpoint { + self.create_checkpoint(&state, &self.log_store, self.version) + .await?; + } + return Ok(Some(state)); + } else { + return Ok(None); + } + } else { + return Ok(None); } - Ok(()) } async fn create_checkpoint( &self, - table: &Option<&'a dyn TableReference>, + table_state: &DeltaTableState, log_store: &LogStoreRef, version: i64, - commit_data: &CommitData, ) -> DeltaResult<()> { - if let Some(table) = table { - let checkpoint_interval = table.config().checkpoint_interval() as i64; - if ((version + 1) % checkpoint_interval) == 0 { - // We have to advance the snapshot otherwise we can't create a checkpoint - let mut snapshot = table.eager_snapshot().unwrap().clone(); - snapshot.advance(vec![commit_data])?; - let state = DeltaTableState { - app_transaction_version: HashMap::new(), - snapshot, - }; - create_checkpoint_for(version, &state, log_store.as_ref()).await? - } + let checkpoint_interval = table_state.config().checkpoint_interval() as i64; + if ((version + 1) % checkpoint_interval) == 0 { + create_checkpoint_for(version, table_state, log_store.as_ref()).await? } Ok(()) } @@ -632,22 +641,22 @@ impl<'a> PostCommit<'a> { /// A commit that successfully completed pub struct FinalizedCommit { - /// The winning version number of the commit + /// The new table state after a commmit + pub snapshot: Option, + + /// Version of the finalized commit pub version: i64, - /// The data that was comitted to the log store - pub data: CommitData, } impl FinalizedCommit { - /// The materialized version of the commit + /// The new table state after a commmit + pub fn snapshot(&self) -> Option { + self.snapshot.clone() + } + /// Version of the finalized commit pub fn version(&self) -> i64 { self.version } - - /// Data used to write the commit - pub fn data(&self) -> &CommitData { - &self.data - } } impl<'a> std::future::IntoFuture for PostCommit<'a> { @@ -658,11 +667,11 @@ impl<'a> std::future::IntoFuture for PostCommit<'a> { let this = self; Box::pin(async move { - match this.run_post_commit_hook(this.version, &this.data).await { - Ok(_) => { + match this.run_post_commit_hook().await { + Ok(snapshot) => { return Ok(FinalizedCommit { + snapshot, version: this.version, - data: this.data, }) } Err(err) => return Err(err), diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 9f4f6d51a3..a23355c4bf 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -41,7 +41,7 @@ use futures::future::BoxFuture; use parquet::file::properties::WriterProperties; use serde::Serialize; -use super::transaction::PROTOCOL; +use super::transaction::{FinalizedCommit, PROTOCOL}; use super::write::write_execution_plan; use super::{ datafusion_utils::Expression, @@ -173,7 +173,7 @@ async fn execute( writer_properties: Option, mut commit_properties: CommitProperties, safe_cast: bool, -) -> DeltaResult<((Vec, i64, Option), UpdateMetrics)> { +) -> DeltaResult<(Option, UpdateMetrics)> { // Validate the predicate and update expressions. // // If the predicate is not set, then all files need to be updated. @@ -189,7 +189,7 @@ async fn execute( let version = snapshot.version(); if updates.is_empty() { - return Ok(((Vec::new(), version, None), metrics)); + return Ok((None, metrics)); } let predicate = match predicate { @@ -218,7 +218,7 @@ async fn execute( metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis() as u64; if candidates.candidates.is_empty() { - return Ok(((Vec::new(), version, None), metrics)); + return Ok((None, metrics)); } let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); @@ -419,14 +419,7 @@ async fn execute( .build(Some(snapshot), log_store, operation)? .await?; - Ok(( - ( - commit.data.actions, - commit.version, - Some(commit.data.operation), - ), - metrics, - )) + Ok((commit.snapshot(), metrics)) } impl std::future::IntoFuture for UpdateBuilder { @@ -449,7 +442,7 @@ impl std::future::IntoFuture for UpdateBuilder { session.state() }); - let ((actions, version, operation), metrics) = execute( + let (new_snapshot, metrics) = execute( this.predicate, this.updates, this.log_store.clone(), @@ -461,11 +454,11 @@ impl std::future::IntoFuture for UpdateBuilder { ) .await?; - if let Some(op) = &operation { - this.snapshot.merge(actions, op, version)?; - } - - let table = DeltaTable::new_with_state(this.log_store, this.snapshot); + let table = if let Some(new_snapshot) = new_snapshot { + DeltaTable::new_with_state(this.log_store, new_snapshot) + } else { + DeltaTable::new_with_state(this.log_store, this.snapshot) + }; Ok((table, metrics)) }) } diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 475abefbe2..a3e3cb3a9e 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -811,9 +811,8 @@ impl std::future::IntoFuture for WriteBuilder { // TODO we do not have the table config available, but since we are merging only our newly // created actions, it may be safe to assume, that we want to include all actions. // then again, having only some tombstones may be misleading. - if let Some(mut snapshot) = this.snapshot { - snapshot.merge(commit.data.actions, &commit.data.operation, commit.version)?; - Ok(DeltaTable::new_with_state(this.log_store, snapshot)) + if let (Some(mut snapshot), Some(new_snapshot)) = (this.snapshot, commit.snapshot) { + Ok(DeltaTable::new_with_state(this.log_store, new_snapshot)) } else { let mut table = DeltaTable::new(this.log_store, Default::default()); table.update().await?; From 80e5eba736123837636aa4591e084fd8af91f9a8 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 7 Apr 2024 20:54:43 +0200 Subject: [PATCH 2/3] update test --- crates/core/tests/command_merge.rs | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/crates/core/tests/command_merge.rs b/crates/core/tests/command_merge.rs index c6e7f09f2f..988891f332 100644 --- a/crates/core/tests/command_merge.rs +++ b/crates/core/tests/command_merge.rs @@ -177,17 +177,7 @@ async fn test_merge_concurrent_different_partition() { // TODO: Currently it throws a Version mismatch error, but the merge commit was successfully // This bug needs to be fixed, see pull request #2280 - assert!(!matches!( - result.as_ref().unwrap_err(), - DeltaTableError::Transaction { .. } - )); - assert!(matches!( - result.as_ref().unwrap_err(), - DeltaTableError::Generic(_) - )); - if let DeltaTableError::Generic(msg) = result.unwrap_err() { - assert_eq!(msg, "Version mismatch"); - } + assert!(matches!(result.as_ref().is_ok(), true)); } #[tokio::test] From 32a62c25bb770b803a638243dc71ce92282f4bd1 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 21 Apr 2024 23:22:09 +0200 Subject: [PATCH 3/3] always return state --- crates/core/src/operations/constraints.rs | 10 +-- crates/core/src/operations/delete.rs | 24 +++-- .../core/src/operations/drop_constraints.rs | 10 +-- crates/core/src/operations/merge/mod.rs | 28 +++--- crates/core/src/operations/transaction/mod.rs | 88 +++++++++---------- crates/core/src/operations/update.rs | 30 +++---- crates/core/src/operations/write.rs | 11 +-- 7 files changed, 89 insertions(+), 112 deletions(-) diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index 9cc9672dfe..6ab7199f47 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -198,12 +198,10 @@ impl std::future::IntoFuture for ConstraintBuilder { .build(Some(&this.snapshot), this.log_store.clone(), operation)? .await?; - let table = if let Some(new_snapshot) = commit.snapshot() { - DeltaTable::new_with_state(this.log_store, new_snapshot) - } else { - DeltaTable::new_with_state(this.log_store, this.snapshot) - }; - Ok(table) + Ok(DeltaTable::new_with_state( + this.log_store, + commit.snapshot(), + )) }) } } diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 980dd92e60..c9a4792d11 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -186,16 +186,16 @@ async fn excute_non_empty_expr( async fn execute( predicate: Option, log_store: LogStoreRef, - snapshot: &DeltaTableState, + snapshot: DeltaTableState, state: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, -) -> DeltaResult<(Option, DeleteMetrics)> { +) -> DeltaResult<(DeltaTableState, DeleteMetrics)> { let exec_start = Instant::now(); let mut metrics = DeleteMetrics::default(); let scan_start = Instant::now(); - let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files(&snapshot, log_store.clone(), &state, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis(); let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); @@ -205,7 +205,7 @@ async fn execute( } else { let write_start = Instant::now(); let add = excute_non_empty_expr( - snapshot, + &snapshot, log_store.clone(), &state, &predicate, @@ -258,12 +258,12 @@ async fn execute( predicate: Some(fmt_expr_to_sql(&predicate)?), }; if actions.is_empty() { - return Ok((None, metrics)); + return Ok((snapshot.clone(), metrics)); } let commit = CommitBuilder::from(commit_properties) .with_actions(actions) - .build(Some(snapshot), log_store, operation)? + .build(Some(&snapshot), log_store, operation)? .await?; Ok((commit.snapshot(), metrics)) } @@ -301,19 +301,17 @@ impl std::future::IntoFuture for DeleteBuilder { let (new_snapshot, metrics) = execute( predicate, this.log_store.clone(), - &this.snapshot, + this.snapshot, state, this.writer_properties, this.commit_properties, ) .await?; - let table = if let Some(new_snapshot) = new_snapshot { - DeltaTable::new_with_state(this.log_store, new_snapshot) - } else { - DeltaTable::new_with_state(this.log_store, this.snapshot) - }; - Ok((table, metrics)) + Ok(( + DeltaTable::new_with_state(this.log_store, new_snapshot), + metrics, + )) }) } } diff --git a/crates/core/src/operations/drop_constraints.rs b/crates/core/src/operations/drop_constraints.rs index 4f0e82bfb5..f1d320d2ff 100644 --- a/crates/core/src/operations/drop_constraints.rs +++ b/crates/core/src/operations/drop_constraints.rs @@ -91,12 +91,10 @@ impl std::future::IntoFuture for DropConstraintBuilder { .build(Some(&this.snapshot), this.log_store.clone(), operation)? .await?; - let table = if let Some(new_snapshot) = commit.snapshot() { - DeltaTable::new_with_state(this.log_store, new_snapshot) - } else { - DeltaTable::new_with_state(this.log_store, this.snapshot) - }; - Ok(table) + Ok(DeltaTable::new_with_state( + this.log_store, + commit.snapshot(), + )) }) } } diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index f49a96a1f3..9d4d5aeb8d 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -932,7 +932,7 @@ async fn execute( predicate: Expression, source: DataFrame, log_store: LogStoreRef, - snapshot: &DeltaTableState, + snapshot: DeltaTableState, state: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, @@ -942,7 +942,7 @@ async fn execute( match_operations: Vec, not_match_target_operations: Vec, not_match_source_operations: Vec, -) -> DeltaResult<(Option, MergeMetrics)> { +) -> DeltaResult<(DeltaTableState, MergeMetrics)> { let mut metrics = MergeMetrics::default(); let exec_start = Instant::now(); @@ -987,7 +987,7 @@ async fn execute( let scan_config = DeltaScanConfigBuilder::default() .with_file_column(true) .with_parquet_pushdown(false) - .build(snapshot)?; + .build(&snapshot)?; let target_provider = Arc::new(DeltaTableProvider::try_new( snapshot.clone(), @@ -1017,7 +1017,7 @@ async fn execute( } else { try_construct_early_filter( predicate.clone(), - snapshot, + &snapshot, &state, &source, &source_name, @@ -1370,7 +1370,7 @@ async fn execute( let rewrite_start = Instant::now(); let add_actions = write_execution_plan( - Some(snapshot), + Some(&snapshot), state.clone(), write, table_partition_cols.clone(), @@ -1449,12 +1449,12 @@ async fn execute( }; if actions.is_empty() { - return Ok((None, metrics)); + return Ok((snapshot, metrics)); } let commit = CommitBuilder::from(commit_properties) .with_actions(actions) - .build(Some(snapshot), log_store.clone(), operation)? + .build(Some(&snapshot), log_store.clone(), operation)? .await?; Ok((commit.snapshot(), metrics)) } @@ -1514,11 +1514,11 @@ impl std::future::IntoFuture for MergeBuilder { session.state() }); - let (new_snapshot, metrics) = execute( + let (snapshot, metrics) = execute( this.predicate, this.source, this.log_store.clone(), - &this.snapshot, + this.snapshot, state, this.writer_properties, this.commit_properties, @@ -1531,12 +1531,10 @@ impl std::future::IntoFuture for MergeBuilder { ) .await?; - let table = if let Some(new_snapshot) = new_snapshot { - DeltaTable::new_with_state(this.log_store, new_snapshot) - } else { - DeltaTable::new_with_state(this.log_store, this.snapshot) - }; - Ok((table, metrics)) + Ok(( + DeltaTable::new_with_state(this.log_store, snapshot), + metrics, + )) }) } } diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 4057ce362b..6606a8c339 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -207,7 +207,7 @@ pub trait TableReference: Send + Sync { fn metadata(&self) -> &Metadata; /// Try to cast this table reference to a `EagerSnapshot` - fn eager_snapshot(&self) -> Option<&EagerSnapshot>; + fn eager_snapshot(&self) -> &EagerSnapshot; } impl TableReference for EagerSnapshot { @@ -223,8 +223,8 @@ impl TableReference for EagerSnapshot { self.table_config() } - fn eager_snapshot(&self) -> Option<&EagerSnapshot> { - Some(self) + fn eager_snapshot(&self) -> &EagerSnapshot { + self } } @@ -241,8 +241,8 @@ impl TableReference for DeltaTableState { self.snapshot.metadata() } - fn eager_snapshot(&self) -> Option<&EagerSnapshot> { - Some(&self.snapshot) + fn eager_snapshot(&self) -> &EagerSnapshot { + &self.snapshot } } @@ -512,13 +512,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { // unwrap() is safe here due to the above check // TODO: refactor to only depend on TableReference Trait - let read_snapshot = - this.table_data - .unwrap() - .eager_snapshot() - .ok_or(DeltaTableError::Generic( - "Expected an instance of EagerSnapshot".to_owned(), - ))?; + let read_snapshot = this.table_data.unwrap().eager_snapshot(); let mut attempt_number = 1; while attempt_number <= this.max_retries { @@ -595,34 +589,38 @@ pub struct PostCommit<'a> { impl<'a> PostCommit<'a> { /// Runs the post commit activities - async fn run_post_commit_hook(&self) -> DeltaResult> { + async fn run_post_commit_hook(&self) -> DeltaResult { if let Some(table) = self.table_data { - if let Some(mut snapshot) = table.eager_snapshot().cloned() { - if self.version - snapshot.version() > 1 { - // This may only occur during concurrent write actions. We need to update the state first to - 1 - // then we can advance. - snapshot - .update(self.log_store.clone(), Some(self.version - 1)) - .await?; - snapshot.advance(vec![&self.data])?; - } else { - snapshot.advance(vec![&self.data])?; - } - let state = DeltaTableState { - app_transaction_version: HashMap::new(), - snapshot, - }; - // Execute each hook - if self.create_checkpoint { - self.create_checkpoint(&state, &self.log_store, self.version) - .await?; - } - return Ok(Some(state)); + let mut snapshot = table.eager_snapshot().clone(); + if self.version - snapshot.version() > 1 { + // This may only occur during concurrent write actions. We need to update the state first to - 1 + // then we can advance. + snapshot + .update(self.log_store.clone(), Some(self.version - 1)) + .await?; + snapshot.advance(vec![&self.data])?; } else { - return Ok(None); + snapshot.advance(vec![&self.data])?; } + let state = DeltaTableState { + app_transaction_version: HashMap::new(), + snapshot, + }; + // Execute each hook + if self.create_checkpoint { + self.create_checkpoint(&state, &self.log_store, self.version) + .await?; + } + Ok(state) } else { - return Ok(None); + let state = DeltaTableState::try_new( + &Path::default(), + self.log_store.object_store(), + Default::default(), + Some(self.version), + ) + .await?; + Ok(state) } } async fn create_checkpoint( @@ -642,7 +640,7 @@ impl<'a> PostCommit<'a> { /// A commit that successfully completed pub struct FinalizedCommit { /// The new table state after a commmit - pub snapshot: Option, + pub snapshot: DeltaTableState, /// Version of the finalized commit pub version: i64, @@ -650,7 +648,7 @@ pub struct FinalizedCommit { impl FinalizedCommit { /// The new table state after a commmit - pub fn snapshot(&self) -> Option { + pub fn snapshot(&self) -> DeltaTableState { self.snapshot.clone() } /// Version of the finalized commit @@ -668,14 +666,12 @@ impl<'a> std::future::IntoFuture for PostCommit<'a> { Box::pin(async move { match this.run_post_commit_hook().await { - Ok(snapshot) => { - return Ok(FinalizedCommit { - snapshot, - version: this.version, - }) - } - Err(err) => return Err(err), - }; + Ok(snapshot) => Ok(FinalizedCommit { + snapshot, + version: this.version, + }), + Err(err) => Err(err), + } }) } } diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index a23355c4bf..9847eda104 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -168,12 +168,12 @@ async fn execute( predicate: Option, updates: HashMap, log_store: LogStoreRef, - snapshot: &DeltaTableState, + snapshot: DeltaTableState, state: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, safe_cast: bool, -) -> DeltaResult<(Option, UpdateMetrics)> { +) -> DeltaResult<(DeltaTableState, UpdateMetrics)> { // Validate the predicate and update expressions. // // If the predicate is not set, then all files need to be updated. @@ -189,7 +189,7 @@ async fn execute( let version = snapshot.version(); if updates.is_empty() { - return Ok((None, metrics)); + return Ok((snapshot, metrics)); } let predicate = match predicate { @@ -214,11 +214,11 @@ async fn execute( let table_partition_cols = current_metadata.partition_columns.clone(); let scan_start = Instant::now(); - let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files(&snapshot, log_store.clone(), &state, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis() as u64; if candidates.candidates.is_empty() { - return Ok((None, metrics)); + return Ok((snapshot, metrics)); } let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); @@ -226,7 +226,7 @@ async fn execute( let execution_props = state.execution_props(); // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches - let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) + let scan = DeltaScanBuilder::new(&snapshot, log_store.clone(), &state) .with_files(&candidates.candidates) .build() .await?; @@ -350,7 +350,7 @@ async fn execute( )?); let add_actions = write_execution_plan( - Some(snapshot), + Some(&snapshot), state.clone(), projection.clone(), table_partition_cols.clone(), @@ -416,7 +416,7 @@ async fn execute( let commit = CommitBuilder::from(commit_properties) .with_actions(actions) - .build(Some(snapshot), log_store, operation)? + .build(Some(&snapshot), log_store, operation)? .await?; Ok((commit.snapshot(), metrics)) @@ -442,11 +442,11 @@ impl std::future::IntoFuture for UpdateBuilder { session.state() }); - let (new_snapshot, metrics) = execute( + let (snapshot, metrics) = execute( this.predicate, this.updates, this.log_store.clone(), - &this.snapshot, + this.snapshot, state, this.writer_properties, this.commit_properties, @@ -454,12 +454,10 @@ impl std::future::IntoFuture for UpdateBuilder { ) .await?; - let table = if let Some(new_snapshot) = new_snapshot { - DeltaTable::new_with_state(this.log_store, new_snapshot) - } else { - DeltaTable::new_with_state(this.log_store, this.snapshot) - }; - Ok((table, metrics)) + Ok(( + DeltaTable::new_with_state(this.log_store, snapshot), + metrics, + )) }) } } diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index a3e3cb3a9e..8ecfb3078b 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -808,16 +808,7 @@ impl std::future::IntoFuture for WriteBuilder { )? .await?; - // TODO we do not have the table config available, but since we are merging only our newly - // created actions, it may be safe to assume, that we want to include all actions. - // then again, having only some tombstones may be misleading. - if let (Some(mut snapshot), Some(new_snapshot)) = (this.snapshot, commit.snapshot) { - Ok(DeltaTable::new_with_state(this.log_store, new_snapshot)) - } else { - let mut table = DeltaTable::new(this.log_store, Default::default()); - table.update().await?; - Ok(table) - } + Ok(DeltaTable::new_with_state(this.log_store, commit.snapshot)) }) } }