Skip to content

Commit

Permalink
fix: refresh snapshot after vacuuming logs
Browse files Browse the repository at this point in the history
Signed-off-by: Ion Koutsouris <[email protected]>
  • Loading branch information
ion-elgreco committed Feb 22, 2025
1 parent a82aedd commit 62f9604
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 50 deletions.
12 changes: 10 additions & 2 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
//! └───────────────────────────────┘
//!</pre>
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;

use bytes::Bytes;
Expand Down Expand Up @@ -766,7 +765,7 @@ impl PostCommit {
} else {
snapshot.advance(vec![&self.data])?;
}
let state = DeltaTableState { snapshot };
let mut state = DeltaTableState { snapshot };

let cleanup_logs = if let Some(cleanup_logs) = self.cleanup_expired_logs {
cleanup_logs
Expand Down Expand Up @@ -809,6 +808,15 @@ impl PostCommit {
Some(post_commit_operation_id),
)
.await? as u64;
if num_log_files_cleaned_up > 0 {
state = DeltaTableState::try_new(
&state.snapshot().table_root(),
self.log_store.object_store(None),
state.load_config().clone(),
Some(self.version),
)
.await?;
}
}

// Run arbitrary after_post_commit_hook code
Expand Down
17 changes: 16 additions & 1 deletion python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,22 @@ class RawDeltaTable:
post_commithook_properties: Optional[PostCommitHookProperties],
) -> None: ...
def __datafusion_table_provider__(self) -> Any: ...
def write(
self,
data: pyarrow.RecordBatchReader,
partition_by: Optional[List[str]],
mode: str,
schema_mode: Optional[str],
predicate: Optional[str],
target_file_size: Optional[int],
name: Optional[str],
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
writer_properties: Optional[WriterProperties],
commit_properties: Optional[CommitProperties],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> None: ...


def rust_core_version() -> str: ...
def write_new_deltalake(
Expand All @@ -253,7 +269,6 @@ def write_to_deltalake(
data: pyarrow.RecordBatchReader,
partition_by: Optional[List[str]],
mode: str,
table: Optional[RawDeltaTable],
schema_mode: Optional[str],
predicate: Optional[str],
target_file_size: Optional[int],
Expand Down
49 changes: 31 additions & 18 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,25 +320,38 @@ def write_deltalake(
conversion_mode=ArrowSchemaConversionMode.PASSTHROUGH,
)
data = RecordBatchReader.from_batches(schema, (batch for batch in data))
write_deltalake_rust(
table_uri=table_uri,
data=data,
partition_by=partition_by,
mode=mode,
table=table._table if table is not None else None,
schema_mode=schema_mode,
predicate=predicate,
target_file_size=target_file_size,
name=name,
description=description,
configuration=configuration,
storage_options=storage_options,
writer_properties=writer_properties,
commit_properties=commit_properties,
post_commithook_properties=post_commithook_properties,
)
if table:
table.update_incremental()
table._table.write(
data=data,
partition_by=partition_by,
mode=mode,
schema_mode=schema_mode,
predicate=predicate,
target_file_size=target_file_size,
name=name,
description=description,
configuration=configuration,
writer_properties=writer_properties,
commit_properties=commit_properties,
post_commithook_properties=post_commithook_properties,
)
else:
write_deltalake_rust(
table_uri=table_uri,
data=data,
partition_by=partition_by,
mode=mode,
schema_mode=schema_mode,
predicate=predicate,
target_file_size=target_file_size,
name=name,
description=description,
configuration=configuration,
storage_options=storage_options,
writer_properties=writer_properties,
commit_properties=commit_properties,
post_commithook_properties=post_commithook_properties,
)
elif engine == "pyarrow":
warnings.warn(
"pyarrow engine is deprecated and will be removed in v1.0",
Expand Down
145 changes: 118 additions & 27 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1402,7 +1402,7 @@ impl RawDeltaTable {
}

pub fn cleanup_metadata(&self, py: Python) -> PyResult<()> {
py.allow_threads(|| {
let (_result, new_state) = py.allow_threads(|| {
let operation_id = Uuid::new_v4();
let handle = Arc::new(LakeFSCustomExecuteHandler {});
let store = &self.log_store()?;
Expand All @@ -1419,10 +1419,29 @@ impl RawDeltaTable {

let result = rt().block_on(async {
match self._table.lock() {
Ok(table) => cleanup_metadata(&table, Some(operation_id))
.await
.map_err(PythonError::from)
.map_err(PyErr::from),
Ok(table) => {
let result = cleanup_metadata(&table, Some(operation_id))
.await
.map_err(PythonError::from)
.map_err(PyErr::from)?;

let new_state = if result > 0 {
Some(
DeltaTableState::try_new(
&table.state.clone().unwrap().snapshot().table_root(),
table.object_store(),
table.config.clone(),
Some(table.version()),
)
.await
.map_err(PythonError::from)?,
)
} else {
None
};

Ok((result, new_state))
}
Err(e) => Err(PyRuntimeError::new_err(e.to_string())),
}
});
Expand All @@ -1439,6 +1458,10 @@ impl RawDeltaTable {
result
})?;

if new_state.is_some() {
self.set_state(new_state)?;
}

Ok(())
}

Expand Down Expand Up @@ -1610,6 +1633,92 @@ impl RawDeltaTable {
Ok(())
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (data, mode, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))]
fn write(
&mut self,
py: Python,
data: PyArrowType<ArrowArrayStreamReader>,
mode: String,
schema_mode: Option<String>,
partition_by: Option<Vec<String>>,
predicate: Option<String>,
target_file_size: Option<usize>,
name: Option<String>,
description: Option<String>,
configuration: Option<HashMap<String, Option<String>>>,
writer_properties: Option<PyWriterProperties>,
commit_properties: Option<PyCommitProperties>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<()> {
let table = py.allow_threads(|| {
let save_mode = mode.parse().map_err(PythonError::from)?;

let mut builder = WriteBuilder::new(self.log_store()?, Some(self.cloned_state()?))
.with_save_mode(save_mode);

let table_provider = to_lazy_table(data.0).map_err(PythonError::from)?;

let plan = LogicalPlanBuilder::scan("source", provider_as_source(table_provider), None)
.map_err(PythonError::from)?
.build()
.map_err(PythonError::from)?;

builder = builder.with_input_execution_plan(Arc::new(plan));

if let Some(schema_mode) = schema_mode {
builder = builder.with_schema_mode(schema_mode.parse().map_err(PythonError::from)?);
}
if let Some(partition_columns) = partition_by {
builder = builder.with_partition_columns(partition_columns);
}

if let Some(writer_props) = writer_properties {
builder = builder.with_writer_properties(
set_writer_properties(writer_props).map_err(PythonError::from)?,
);
}

if let Some(name) = &name {
builder = builder.with_table_name(name);
};

if let Some(description) = &description {
builder = builder.with_description(description);
};

if let Some(predicate) = predicate {
builder = builder.with_replace_where(predicate);
};

if let Some(target_file_size) = target_file_size {
builder = builder.with_target_file_size(target_file_size)
};

if let Some(config) = configuration {
builder = builder.with_configuration(config);
};

if let Some(commit_properties) =
maybe_create_commit_properties(commit_properties, post_commithook_properties)
{
builder = builder.with_commit_properties(commit_properties);
};

if self.log_store()?.name() == "LakeFSLogStore" {
builder =
builder.with_custom_execute_handler(Arc::new(LakeFSCustomExecuteHandler {}))
}

rt().block_on(builder.into_future())
.map_err(PythonError::from)
.map_err(PyErr::from)
})?;

self.set_state(table.state)?;
Ok(())
}

fn __datafusion_table_provider__<'py>(
&self,
py: Python<'py>,
Expand Down Expand Up @@ -2137,13 +2246,12 @@ pub struct PyCommitProperties {

#[pyfunction]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (table_uri, data, mode, table=None, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, storage_options=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))]
#[pyo3(signature = (table_uri, data, mode, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, storage_options=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))]
fn write_to_deltalake(
py: Python,
table_uri: String,
data: PyArrowType<ArrowArrayStreamReader>,
mode: String,
table: Option<&RawDeltaTable>,
schema_mode: Option<String>,
partition_by: Option<Vec<String>>,
predicate: Option<String>,
Expand All @@ -2160,39 +2268,22 @@ fn write_to_deltalake(
let save_mode = mode.parse().map_err(PythonError::from)?;

let options = storage_options.clone().unwrap_or_default();
let table = if let Some(table) = table {
table.with_table(|t| Ok(DeltaOps::from(t.clone())))?
} else {
rt().block_on(DeltaOps::try_from_uri_with_storage_options(
let table = rt()
.block_on(DeltaOps::try_from_uri_with_storage_options(
&table_uri, options,
))
.map_err(PythonError::from)?
};

// let dont_be_so_lazy = match table.0.state.as_ref() {
// Some(state) => state.table_config().enable_change_data_feed() && predicate.is_some(),
// // You don't have state somehow, so I guess it's okay to be lazy.
// _ => false,
// };
.map_err(PythonError::from)?;

let mut builder =
WriteBuilder::new(table.0.log_store(), table.0.state).with_save_mode(save_mode);

// if dont_be_so_lazy {
// debug!(
// "write_to_deltalake() is not able to lazily perform a write, collecting batches"
// );
// builder = builder.with_input_batches(data.0.map(|batch| batch.unwrap()));
// } else {

let table_provider = to_lazy_table(data.0).map_err(PythonError::from)?;

let plan = LogicalPlanBuilder::scan("source", provider_as_source(table_provider), None)
.map_err(PythonError::from)?
.build()
.map_err(PythonError::from)?;
builder = builder.with_input_execution_plan(Arc::new(plan));
// }

if let Some(schema_mode) = schema_mode {
builder = builder.with_schema_mode(schema_mode.parse().map_err(PythonError::from)?);
Expand Down
29 changes: 29 additions & 0 deletions python/tests/test_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,3 +539,32 @@ def test_checkpoint_with_multiple_writes(tmp_path: pathlib.Path):
new_df = dt.to_pandas()
print(dt.to_pandas())
assert len(new_df) == 1, "We overwrote! there should only be one row"


@pytest.mark.polars
def test_refresh_snapshot_after_log_cleanup_3057(tmp_path):
"""https://github.com/delta-io/delta-rs/issues/3057"""
import polars as pl
configuration = {
"delta.deletedFileRetentionDuration": "interval 0 days",
"delta.logRetentionDuration": "interval 0 days",
"delta.targetFileSize": str(128 * 1024 * 1024),
}

for i in range(2):
df = pl.DataFrame({"foo": [i]})
df.write_delta(str(tmp_path), delta_write_options={"configuration": configuration},mode="append")

# create checkpoint so that logs before checkpoint can get removed
dt = DeltaTable(tmp_path)
dt.create_checkpoint()

# Write to table again, snapshot should be correctly refreshed so that clean_up metadata can run after this
df = pl.DataFrame({"foo": [1]})
df.write_delta(dt, mode="append")

# Vacuum is noop, since we already removed logs before and snapshot doesn't reference them anymore

vacuum_log = dt.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False)

assert vacuum_log == []
19 changes: 17 additions & 2 deletions python/tests/test_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,28 @@ def comp():


@pytest.mark.polars
def test_multithreaded_write(sample_data: pa.Table, tmp_path: pathlib.Path):
def test_multithreaded_write_using_table(sample_data: pa.Table, tmp_path: pathlib.Path):
import polars as pl

table = pl.DataFrame({"a": [1, 2, 3]}).to_arrow()
write_deltalake(tmp_path, table, mode="overwrite")

dt = DeltaTable(tmp_path)

with pytest.raises(RuntimeError, match="Already mutably borrowed"):
with ThreadPoolExecutor() as exe:
list(exe.map(lambda _: write_deltalake(dt, table, mode="append"), range(5)))


@pytest.mark.polars
def test_multithreaded_write_using_path(sample_data: pa.Table, tmp_path: pathlib.Path):
import polars as pl

table = pl.DataFrame({"a": [1, 2, 3]}).to_arrow()
write_deltalake(tmp_path, table, mode="overwrite")

dt = DeltaTable(tmp_path)


with ThreadPoolExecutor() as exe:
list(exe.map(lambda _: write_deltalake(dt, table, mode="append"), range(5)))
list(exe.map(lambda _: write_deltalake(tmp_path, table, mode="append"), range(5)))

0 comments on commit 62f9604

Please sign in to comment.