From 93f4270280441921c688dc5c6e5a684e69e6bcf0 Mon Sep 17 00:00:00 2001 From: dengkai02 Date: Thu, 27 Apr 2023 11:19:23 +0800 Subject: [PATCH 1/5] issue-622: add optimize command in python binding --- python/deltalake/table.py | 13 +++++++++++++ python/src/lib.rs | 23 +++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index d2462e546c..9b4fe5944e 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -313,6 +313,19 @@ def vacuum( return self._table.vacuum(dry_run, retention_hours, enforce_retention_duration) + def optimize( + self, + partition_filters: Optional[List[Tuple[str, str, Any]]] = None, + target_size: Optional[int] = None, + ) -> Dict[str, Any]: + """ + :param partition_filters: the partition filters that will be used for getting the matched files + :param target_size: desired file size after bin-packing files + :return: the metrics from optimize + """ + metrics = self._table.optimize(partition_filters, target_size) + return json.loads(metrics) + def pyarrow_schema(self) -> pyarrow.Schema: """ Get the current schema of the DeltaTable with the Parquet PyArrow format. diff --git a/python/src/lib.rs b/python/src/lib.rs index ad57f7f647..f372775577 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -17,6 +17,7 @@ use deltalake::checkpoints::create_checkpoint; use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::operations::transaction::commit; +use deltalake::operations::optimize::OptimizeBuilder; use deltalake::operations::vacuum::VacuumBuilder; use deltalake::partitions::PartitionFilter; use deltalake::{DeltaDataTypeLong, DeltaDataTypeTimestamp, DeltaTableMetaData, Invariant, Schema}; @@ -310,6 +311,28 @@ impl RawDeltaTable { Ok(metrics.files_deleted) } + // Run the optimize command on the Delta Table: merge small files into a large file by bin-packing. + #[pyo3(signature = (partition_filters = None, target_size = None))] + pub fn optimize( + &mut self, + partition_filters: Option>, + target_size: Option, + ) -> PyResult{ + let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()); + if let Some(size) = target_size { + cmd = cmd.with_target_size(size); + } + let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default()) + .map_err(PyDeltaTableError::from_raw)?; + cmd = cmd.with_filters(&converted_filters); + + let (table, metrics) = rt()? + .block_on(async { cmd.await }) + .map_err(PyDeltaTableError::from_raw)?; + self._table.state = table.state; + Ok(serde_json::to_string(&metrics).unwrap()) + } + // Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table. pub fn history(&mut self, limit: Option) -> PyResult> { let history = rt()? From e5f929e9736a00d62244f9cdd3da27facc5b3d41 Mon Sep 17 00:00:00 2001 From: dengkai02 Date: Thu, 27 Apr 2023 11:19:23 +0800 Subject: [PATCH 2/5] feat: add optimize command in python binding --- python/docs/source/usage.rst | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/python/docs/source/usage.rst b/python/docs/source/usage.rst index f026e4626c..7901cbce0c 100644 --- a/python/docs/source/usage.rst +++ b/python/docs/source/usage.rst @@ -399,7 +399,23 @@ only list the files to be deleted. Pass ``dry_run=False`` to actually delete fil Optimizing tables ~~~~~~~~~~~~~~~~~ -Optimizing tables is not currently supported. +Optimizing a table will perform bin-packing on a Delta Table which merges small files +into a large file. Bin-packing reduces the number of API calls required for read operations. +Optimizing will increments the table's version and creates remove actions for optimized files. +Optimize does not delete files from storage. To delete files that were removed, call :meth:`DeltaTable.vacuum`. + +Use :meth:`DeltaTable.optimize` to perform the optimize operation. Note that currently optimize only supports +append-only workflows. Use with other workflows may corrupt your table state. + +.. code-block:: python + + >>> dt = DeltaTable("../rust/tests/data/simple_table") + >>> dt.optimize() + {'numFilesAdded': 1, 'numFilesRemoved': 5, + 'filesAdded': {'min': 555, 'max': 555, 'avg': 555.0, 'totalFiles': 1, 'totalSize': 555}, + 'filesRemoved': {'min': 262, 'max': 429, 'avg': 362.2, 'totalFiles': 5, 'totalSize': 1811}, + 'partitionsOptimized': 1, 'numBatches': 1, 'totalConsideredFiles': 5, + 'totalFilesSkipped': 0, 'preserveInsertionOrder': True} Writing Delta Tables -------------------- From 85618c8ac156cfb2ad20848a0d831996cfc2a7cd Mon Sep 17 00:00:00 2001 From: dengkai02 Date: Thu, 27 Apr 2023 11:19:23 +0800 Subject: [PATCH 3/5] feat: add optimize command in python binding --- python/tests/test_optimize.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 python/tests/test_optimize.py diff --git a/python/tests/test_optimize.py b/python/tests/test_optimize.py new file mode 100644 index 0000000000..32504d7d99 --- /dev/null +++ b/python/tests/test_optimize.py @@ -0,0 +1,27 @@ +import pathlib + +import pyarrow as pa +import pytest + +from deltalake import DeltaTable, write_deltalake + + +@pytest.mark.parametrize("use_relative", [True, False]) +def test_optimize_run_table( + tmp_path: pathlib.Path, sample_data: pa.Table, monkeypatch, use_relative: bool +): + if use_relative: + monkeypatch.chdir(tmp_path) # Make tmp_path the working directory + (tmp_path / "path/to/table").mkdir(parents=True) + table_path = "./path/to/table" + else: + table_path = str(tmp_path) + + write_deltalake(table_path, sample_data, mode="append") + write_deltalake(table_path, sample_data, mode="append") + write_deltalake(table_path, sample_data, mode="append") + + dt = DeltaTable(table_path) + dt.optimize() + last_action = dt.history(1)[0] + assert last_action['operation'] == 'OPTIMIZE' From 92f63c0621bf58e22d5f770e45b04092d2310a84 Mon Sep 17 00:00:00 2001 From: dengkai02 Date: Thu, 27 Apr 2023 11:19:23 +0800 Subject: [PATCH 4/5] feat: add optimize command in python binding --- python/deltalake/table.py | 12 +++++++++++- python/docs/source/usage.rst | 4 ++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 9b4fe5944e..f0c7498458 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -319,8 +319,18 @@ def optimize( target_size: Optional[int] = None, ) -> Dict[str, Any]: """ + Compacts small files to reduce the total number of files in the table. + + This operation is idempotent; if run twice on the same table (assuming it has + not been updated) it will do nothing the second time. + + If this operation happens concurrently with any operations other than append, + it will fail. + :param partition_filters: the partition filters that will be used for getting the matched files - :param target_size: desired file size after bin-packing files + :param target_size: desired file size after bin-packing files, in bytes. If not + provided, will attempt to read the table configuration value ``delta.targetFileSize``. + If that value isn't set, will use default value of 256MB. :return: the metrics from optimize """ metrics = self._table.optimize(partition_filters, target_size) diff --git a/python/docs/source/usage.rst b/python/docs/source/usage.rst index 7901cbce0c..53731dc1d0 100644 --- a/python/docs/source/usage.rst +++ b/python/docs/source/usage.rst @@ -404,8 +404,8 @@ into a large file. Bin-packing reduces the number of API calls required for read Optimizing will increments the table's version and creates remove actions for optimized files. Optimize does not delete files from storage. To delete files that were removed, call :meth:`DeltaTable.vacuum`. -Use :meth:`DeltaTable.optimize` to perform the optimize operation. Note that currently optimize only supports -append-only workflows. Use with other workflows may corrupt your table state. +Use :meth:`DeltaTable.optimize` to perform the optimize operation. Note that this method will fail if a +concurrent writer performs an operation that removes any files (such as an overwrite). .. code-block:: python From 7d049aef02dac07d0a60525692c9dc0942fe0ef6 Mon Sep 17 00:00:00 2001 From: dengkai02 Date: Thu, 27 Apr 2023 11:19:23 +0800 Subject: [PATCH 5/5] feat: add optimize command in python binding --- python/deltalake/table.py | 6 +++--- python/src/lib.rs | 4 ++-- python/tests/test_optimize.py | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index f0c7498458..97e98c000a 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -314,9 +314,9 @@ def vacuum( return self._table.vacuum(dry_run, retention_hours, enforce_retention_duration) def optimize( - self, - partition_filters: Optional[List[Tuple[str, str, Any]]] = None, - target_size: Optional[int] = None, + self, + partition_filters: Optional[List[Tuple[str, str, Any]]] = None, + target_size: Optional[int] = None, ) -> Dict[str, Any]: """ Compacts small files to reduce the total number of files in the table. diff --git a/python/src/lib.rs b/python/src/lib.rs index 9fc0d51e9f..bf4c307dba 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -16,8 +16,8 @@ use deltalake::builder::DeltaTableBuilder; use deltalake::checkpoints::create_checkpoint; use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::DeltaDataChecker; -use deltalake::operations::transaction::commit; use deltalake::operations::optimize::OptimizeBuilder; +use deltalake::operations::transaction::commit; use deltalake::operations::vacuum::VacuumBuilder; use deltalake::partitions::PartitionFilter; use deltalake::{ @@ -321,7 +321,7 @@ impl RawDeltaTable { &mut self, partition_filters: Option>, target_size: Option, - ) -> PyResult{ + ) -> PyResult { let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()); if let Some(size) = target_size { cmd = cmd.with_target_size(size); diff --git a/python/tests/test_optimize.py b/python/tests/test_optimize.py index 32504d7d99..1c04aa5545 100644 --- a/python/tests/test_optimize.py +++ b/python/tests/test_optimize.py @@ -24,4 +24,4 @@ def test_optimize_run_table( dt = DeltaTable(table_path) dt.optimize() last_action = dt.history(1)[0] - assert last_action['operation'] == 'OPTIMIZE' + assert last_action["operation"] == "OPTIMIZE"