Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): advance state in post commit #2396

Merged
merged 3 commits into from
Apr 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,10 @@ impl std::future::IntoFuture for ConstraintBuilder {
.build(Some(&this.snapshot), this.log_store.clone(), operation)?
.await?;

this.snapshot
.merge(commit.data.actions, &commit.data.operation, commit.version)?;
Ok(DeltaTable::new_with_state(this.log_store, this.snapshot))
Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
))
})
}
}
Expand Down
35 changes: 13 additions & 22 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,16 @@ async fn excute_non_empty_expr(
async fn execute(
predicate: Option<Expr>,
log_store: LogStoreRef,
snapshot: &DeltaTableState,
snapshot: DeltaTableState,
state: SessionState,
writer_properties: Option<WriterProperties>,
mut commit_properties: CommitProperties,
) -> DeltaResult<((Vec<Action>, i64, Option<DeltaOperation>), DeleteMetrics)> {
) -> DeltaResult<(DeltaTableState, DeleteMetrics)> {
let exec_start = Instant::now();
let mut metrics = DeleteMetrics::default();

let scan_start = Instant::now();
let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?;
let candidates = find_files(&snapshot, log_store.clone(), &state, predicate.clone()).await?;
metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis();

let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true))));
Expand All @@ -205,7 +205,7 @@ async fn execute(
} else {
let write_start = Instant::now();
let add = excute_non_empty_expr(
snapshot,
&snapshot,
log_store.clone(),
&state,
&predicate,
Expand Down Expand Up @@ -258,21 +258,14 @@ async fn execute(
predicate: Some(fmt_expr_to_sql(&predicate)?),
};
if actions.is_empty() {
return Ok(((actions, snapshot.version(), None), metrics));
return Ok((snapshot.clone(), metrics));
}

let commit = CommitBuilder::from(commit_properties)
.with_actions(actions)
.build(Some(snapshot), log_store, operation)?
.build(Some(&snapshot), log_store, operation)?
.await?;
Ok((
(
commit.data.actions,
commit.version,
Some(commit.data.operation),
),
metrics,
))
Ok((commit.snapshot(), metrics))
}

impl std::future::IntoFuture for DeleteBuilder {
Expand Down Expand Up @@ -305,22 +298,20 @@ impl std::future::IntoFuture for DeleteBuilder {
None => None,
};

let ((actions, version, operation), metrics) = execute(
let (new_snapshot, metrics) = execute(
predicate,
this.log_store.clone(),
&this.snapshot,
this.snapshot,
state,
this.writer_properties,
this.commit_properties,
)
.await?;

if let Some(op) = &operation {
this.snapshot.merge(actions, op, version)?;
}

let table = DeltaTable::new_with_state(this.log_store, this.snapshot);
Ok((table, metrics))
Ok((
DeltaTable::new_with_state(this.log_store, new_snapshot),
metrics,
))
})
}
}
Expand Down
7 changes: 4 additions & 3 deletions crates/core/src/operations/drop_constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ impl std::future::IntoFuture for DropConstraintBuilder {
.build(Some(&this.snapshot), this.log_store.clone(), operation)?
.await?;

this.snapshot
.merge(commit.data.actions, &commit.data.operation, commit.version)?;
Ok(DeltaTable::new_with_state(this.log_store, this.snapshot))
Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
))
})
}
}
Expand Down
37 changes: 14 additions & 23 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ async fn execute(
predicate: Expression,
source: DataFrame,
log_store: LogStoreRef,
snapshot: &DeltaTableState,
snapshot: DeltaTableState,
state: SessionState,
writer_properties: Option<WriterProperties>,
mut commit_properties: CommitProperties,
Expand All @@ -942,7 +942,7 @@ async fn execute(
match_operations: Vec<MergeOperationConfig>,
not_match_target_operations: Vec<MergeOperationConfig>,
not_match_source_operations: Vec<MergeOperationConfig>,
) -> DeltaResult<((Vec<Action>, i64, Option<DeltaOperation>), MergeMetrics)> {
) -> DeltaResult<(DeltaTableState, MergeMetrics)> {
let mut metrics = MergeMetrics::default();
let exec_start = Instant::now();

Expand Down Expand Up @@ -987,7 +987,7 @@ async fn execute(
let scan_config = DeltaScanConfigBuilder::default()
.with_file_column(true)
.with_parquet_pushdown(false)
.build(snapshot)?;
.build(&snapshot)?;

let target_provider = Arc::new(DeltaTableProvider::try_new(
snapshot.clone(),
Expand Down Expand Up @@ -1017,7 +1017,7 @@ async fn execute(
} else {
try_construct_early_filter(
predicate.clone(),
snapshot,
&snapshot,
&state,
&source,
&source_name,
Expand Down Expand Up @@ -1370,7 +1370,7 @@ async fn execute(

let rewrite_start = Instant::now();
let add_actions = write_execution_plan(
Some(snapshot),
Some(&snapshot),
state.clone(),
write,
table_partition_cols.clone(),
Expand Down Expand Up @@ -1449,21 +1449,14 @@ async fn execute(
};

if actions.is_empty() {
return Ok(((actions, snapshot.version(), None), metrics));
return Ok((snapshot, metrics));
}

let commit = CommitBuilder::from(commit_properties)
.with_actions(actions)
.build(Some(snapshot), log_store.clone(), operation)?
.build(Some(&snapshot), log_store.clone(), operation)?
.await?;
Ok((
(
commit.data.actions,
commit.version,
Some(commit.data.operation),
),
metrics,
))
Ok((commit.snapshot(), metrics))
}

fn remove_table_alias(expr: Expr, table_alias: &str) -> Expr {
Expand Down Expand Up @@ -1521,11 +1514,11 @@ impl std::future::IntoFuture for MergeBuilder {
session.state()
});

let ((actions, version, operation), metrics) = execute(
let (snapshot, metrics) = execute(
this.predicate,
this.source,
this.log_store.clone(),
&this.snapshot,
this.snapshot,
state,
this.writer_properties,
this.commit_properties,
Expand All @@ -1538,12 +1531,10 @@ impl std::future::IntoFuture for MergeBuilder {
)
.await?;

if let Some(op) = &operation {
this.snapshot.merge(actions, op, version)?;
}
let table = DeltaTable::new_with_state(this.log_store, this.snapshot);

Ok((table, metrics))
Ok((
DeltaTable::new_with_state(this.log_store, snapshot),
metrics,
))
})
}
}
Expand Down
111 changes: 58 additions & 53 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ pub trait TableReference: Send + Sync {
fn metadata(&self) -> &Metadata;

/// Try to cast this table reference to a `EagerSnapshot`
fn eager_snapshot(&self) -> Option<&EagerSnapshot>;
fn eager_snapshot(&self) -> &EagerSnapshot;
}

impl TableReference for EagerSnapshot {
Expand All @@ -223,8 +223,8 @@ impl TableReference for EagerSnapshot {
self.table_config()
}

fn eager_snapshot(&self) -> Option<&EagerSnapshot> {
Some(self)
fn eager_snapshot(&self) -> &EagerSnapshot {
self
}
}

Expand All @@ -241,8 +241,8 @@ impl TableReference for DeltaTableState {
self.snapshot.metadata()
}

fn eager_snapshot(&self) -> Option<&EagerSnapshot> {
Some(&self.snapshot)
fn eager_snapshot(&self) -> &EagerSnapshot {
&self.snapshot
}
}

Expand Down Expand Up @@ -512,13 +512,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {

// unwrap() is safe here due to the above check
// TODO: refactor to only depend on TableReference Trait
let read_snapshot =
this.table_data
.unwrap()
.eager_snapshot()
.ok_or(DeltaTableError::Generic(
"Expected an instance of EagerSnapshot".to_owned(),
))?;
let read_snapshot = this.table_data.unwrap().eager_snapshot();

let mut attempt_number = 1;
while attempt_number <= this.max_retries {
Expand Down Expand Up @@ -595,59 +589,72 @@ pub struct PostCommit<'a> {

impl<'a> PostCommit<'a> {
/// Runs the post commit activities
async fn run_post_commit_hook(
&self,
version: i64,
commit_data: &CommitData,
) -> DeltaResult<()> {
if self.create_checkpoint {
self.create_checkpoint(&self.table_data, &self.log_store, version, commit_data)
.await?
async fn run_post_commit_hook(&self) -> DeltaResult<DeltaTableState> {
if let Some(table) = self.table_data {
let mut snapshot = table.eager_snapshot().clone();
if self.version - snapshot.version() > 1 {
// This may only occur during concurrent write actions. We need to update the state first to - 1
// then we can advance.
snapshot
.update(self.log_store.clone(), Some(self.version - 1))
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
.await?;
snapshot.advance(vec![&self.data])?;
} else {
snapshot.advance(vec![&self.data])?;
}
let state = DeltaTableState {
app_transaction_version: HashMap::new(),
snapshot,
};
// Execute each hook
if self.create_checkpoint {
self.create_checkpoint(&state, &self.log_store, self.version)
.await?;
}
Ok(state)
} else {
let state = DeltaTableState::try_new(
&Path::default(),
self.log_store.object_store(),
Default::default(),
Some(self.version),
)
.await?;
Ok(state)
}
Ok(())
}
async fn create_checkpoint(
&self,
table: &Option<&'a dyn TableReference>,
table_state: &DeltaTableState,
log_store: &LogStoreRef,
version: i64,
commit_data: &CommitData,
) -> DeltaResult<()> {
if let Some(table) = table {
let checkpoint_interval = table.config().checkpoint_interval() as i64;
if ((version + 1) % checkpoint_interval) == 0 {
// We have to advance the snapshot otherwise we can't create a checkpoint
let mut snapshot = table.eager_snapshot().unwrap().clone();
snapshot.advance(vec![commit_data])?;
let state = DeltaTableState {
app_transaction_version: HashMap::new(),
snapshot,
};
create_checkpoint_for(version, &state, log_store.as_ref()).await?
}
let checkpoint_interval = table_state.config().checkpoint_interval() as i64;
if ((version + 1) % checkpoint_interval) == 0 {
create_checkpoint_for(version, table_state, log_store.as_ref()).await?
}
Ok(())
}
}

/// A commit that successfully completed
pub struct FinalizedCommit {
/// The winning version number of the commit
/// The new table state after a commmit
pub snapshot: DeltaTableState,

/// Version of the finalized commit
pub version: i64,
/// The data that was comitted to the log store
pub data: CommitData,
}

impl FinalizedCommit {
/// The materialized version of the commit
/// The new table state after a commmit
pub fn snapshot(&self) -> DeltaTableState {
self.snapshot.clone()
}
/// Version of the finalized commit
pub fn version(&self) -> i64 {
self.version
}

/// Data used to write the commit
pub fn data(&self) -> &CommitData {
&self.data
}
}

impl<'a> std::future::IntoFuture for PostCommit<'a> {
Expand All @@ -658,15 +665,13 @@ impl<'a> std::future::IntoFuture for PostCommit<'a> {
let this = self;

Box::pin(async move {
match this.run_post_commit_hook(this.version, &this.data).await {
Ok(_) => {
return Ok(FinalizedCommit {
version: this.version,
data: this.data,
})
}
Err(err) => return Err(err),
};
match this.run_post_commit_hook().await {
Ok(snapshot) => Ok(FinalizedCommit {
snapshot,
version: this.version,
}),
Err(err) => Err(err),
}
})
}
}
Expand Down
Loading
Loading