Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add optimize command in python binding #1313

Merged
merged 6 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,29 @@ def vacuum(

return self._table.vacuum(dry_run, retention_hours, enforce_retention_duration)

def optimize(
self,
partition_filters: Optional[List[Tuple[str, str, Any]]] = None,
target_size: Optional[int] = None,
) -> Dict[str, Any]:
"""
loleek marked this conversation as resolved.
Show resolved Hide resolved
Compacts small files to reduce the total number of files in the table.

This operation is idempotent; if run twice on the same table (assuming it has
not been updated) it will do nothing the second time.

If this operation happens concurrently with any operations other than append,
it will fail.

:param partition_filters: the partition filters that will be used for getting the matched files
:param target_size: desired file size after bin-packing files, in bytes. If not
provided, will attempt to read the table configuration value ``delta.targetFileSize``.
If that value isn't set, will use default value of 256MB.
:return: the metrics from optimize
"""
metrics = self._table.optimize(partition_filters, target_size)
return json.loads(metrics)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have this be typed for the sake of autocompletion, but doing that in Rust right now involves a bit of boilerplate. One lightweight option is https://docs.python.org/3.8/library/typing.html#typing.TypedDict, but that's Python>=3.8 only. We still have a month or two until 3.7 is EOL.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps something like:

if TYPE_CHECKING and sys.version_info().minor >= 8:
    from typing import TypedDict

    class MetricDetails(TypeDict):
        min: int
        max: int
        avg: float
        total_files: int
        total_size: int
    
    class OptimizeMetrics(TypeDict):
        num_files_added: int
        num_files_removed: int
        files_added: MetricDetails
        files_removed: MetricDetails
        partitions_optimized: int
        num_batches: int
        total_considered_files: int
        total_files_skipped: int
        preserve_insertion_order: bool

Then you can modify the signature:

def optimize(
            self,
            partition_filters: Optional[List[Tuple[str, str, Any]]] = None,
            target_size: Optional[int] = None,
    ) -> "OptimizeMetrics":

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I'm happy to leave this for a follow up later FYI)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I'll leave this in current PR.


def pyarrow_schema(self) -> pyarrow.Schema:
"""
Get the current schema of the DeltaTable with the Parquet PyArrow format.
Expand Down
18 changes: 17 additions & 1 deletion python/docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,23 @@ only list the files to be deleted. Pass ``dry_run=False`` to actually delete fil
Optimizing tables
~~~~~~~~~~~~~~~~~

Optimizing tables is not currently supported.
Optimizing a table will perform bin-packing on a Delta Table which merges small files
into a large file. Bin-packing reduces the number of API calls required for read operations.
Optimizing will increments the table's version and creates remove actions for optimized files.
Optimize does not delete files from storage. To delete files that were removed, call :meth:`DeltaTable.vacuum`.

Use :meth:`DeltaTable.optimize` to perform the optimize operation. Note that this method will fail if a
concurrent writer performs an operation that removes any files (such as an overwrite).

.. code-block:: python

>>> dt = DeltaTable("../rust/tests/data/simple_table")
>>> dt.optimize()
{'numFilesAdded': 1, 'numFilesRemoved': 5,
'filesAdded': {'min': 555, 'max': 555, 'avg': 555.0, 'totalFiles': 1, 'totalSize': 555},
'filesRemoved': {'min': 262, 'max': 429, 'avg': 362.2, 'totalFiles': 5, 'totalSize': 1811},
'partitionsOptimized': 1, 'numBatches': 1, 'totalConsideredFiles': 5,
'totalFilesSkipped': 0, 'preserveInsertionOrder': True}

Writing Delta Tables
--------------------
Expand Down
23 changes: 23 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use deltalake::builder::DeltaTableBuilder;
use deltalake::checkpoints::create_checkpoint;
use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::operations::optimize::OptimizeBuilder;
use deltalake::operations::transaction::commit;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::partitions::PartitionFilter;
Expand Down Expand Up @@ -314,6 +315,28 @@ impl RawDeltaTable {
Ok(metrics.files_deleted)
}

// Run the optimize command on the Delta Table: merge small files into a large file by bin-packing.
#[pyo3(signature = (partition_filters = None, target_size = None))]
pub fn optimize(
&mut self,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
target_size: Option<DeltaDataTypeLong>,
) -> PyResult<String> {
let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone());
if let Some(size) = target_size {
cmd = cmd.with_target_size(size);
}
let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default())
.map_err(PyDeltaTableError::from_raw)?;
cmd = cmd.with_filters(&converted_filters);

let (table, metrics) = rt()?
.block_on(async { cmd.await })
.map_err(PyDeltaTableError::from_raw)?;
self._table.state = table.state;
Ok(serde_json::to_string(&metrics).unwrap())
}

// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table.
pub fn history(&mut self, limit: Option<usize>) -> PyResult<Vec<String>> {
let history = rt()?
Expand Down
27 changes: 27 additions & 0 deletions python/tests/test_optimize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import pathlib

import pyarrow as pa
import pytest

from deltalake import DeltaTable, write_deltalake


@pytest.mark.parametrize("use_relative", [True, False])
def test_optimize_run_table(
tmp_path: pathlib.Path, sample_data: pa.Table, monkeypatch, use_relative: bool
):
if use_relative:
monkeypatch.chdir(tmp_path) # Make tmp_path the working directory
(tmp_path / "path/to/table").mkdir(parents=True)
table_path = "./path/to/table"
else:
table_path = str(tmp_path)

write_deltalake(table_path, sample_data, mode="append")
write_deltalake(table_path, sample_data, mode="append")
write_deltalake(table_path, sample_data, mode="append")

dt = DeltaTable(table_path)
dt.optimize()
last_action = dt.history(1)[0]
assert last_action["operation"] == "OPTIMIZE"