Skip to content

Commit

Permalink
feat: Add CommitProperties data class
Browse files Browse the repository at this point in the history
  • Loading branch information
helanto authored and ion-elgreco committed Sep 7, 2024
1 parent a99c685 commit ecd2a08
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 96 deletions.
168 changes: 100 additions & 68 deletions python/deltalake/table.py

Large diffs are not rendered by default.

34 changes: 19 additions & 15 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
MAX_SUPPORTED_PYARROW_WRITER_VERSION,
NOT_SUPPORTED_PYARROW_WRITER_VERSIONS,
SUPPORTED_WRITER_FEATURES,
CommitProperties,
DeltaTable,
PostCommitHookProperties,
WriterProperties,
Expand Down Expand Up @@ -122,9 +123,8 @@ def write_deltalake(
partition_filters: Optional[List[Tuple[str, str, Any]]] = ...,
large_dtypes: bool = ...,
engine: Literal["pyarrow"] = ...,
custom_metadata: Optional[Dict[str, str]] = ...,
commit_properties: Optional[CommitProperties] = ...,
post_commithook_properties: Optional[PostCommitHookProperties] = ...,
max_commit_retries: Optional[int] = ...,
) -> None: ...


Expand Down Expand Up @@ -152,9 +152,8 @@ def write_deltalake(
large_dtypes: bool = ...,
engine: Literal["rust"] = ...,
writer_properties: WriterProperties = ...,
custom_metadata: Optional[Dict[str, str]] = ...,
commit_properties: Optional[CommitProperties] = ...,
post_commithook_properties: Optional[PostCommitHookProperties] = ...,
max_commit_retries: Optional[int] = ...,
) -> None: ...


Expand Down Expand Up @@ -184,9 +183,8 @@ def write_deltalake(
large_dtypes: bool = ...,
engine: Literal["rust"] = ...,
writer_properties: WriterProperties = ...,
custom_metadata: Optional[Dict[str, str]] = ...,
commit_properties: Optional[CommitProperties] = ...,
post_commithook_properties: Optional[PostCommitHookProperties] = ...,
max_commit_retries: Optional[int] = ...,
) -> None: ...


Expand Down Expand Up @@ -222,9 +220,8 @@ def write_deltalake(
large_dtypes: bool = False,
engine: Literal["pyarrow", "rust"] = "rust",
writer_properties: Optional[WriterProperties] = None,
custom_metadata: Optional[Dict[str, str]] = None,
commit_properties: Optional[CommitProperties] = None,
post_commithook_properties: Optional[PostCommitHookProperties] = None,
max_commit_retries: Optional[int] = None,
) -> None:
"""Write to a Delta Lake table
Expand Down Expand Up @@ -279,9 +276,8 @@ def write_deltalake(
large_dtypes: Only used for pyarrow engine
engine: writer engine to write the delta table. PyArrow engine is deprecated, and will be removed in v1.0.
writer_properties: Pass writer properties to the Rust parquet writer.
custom_metadata: Custom metadata to add to the commitInfo.
commit_properties: properties of the transaction commit. If None, default values are used.
post_commithook_properties: properties for the post commit hook. If None, default values are used.
max_commit_retries: maximum number of times to retry the transaction commit.
"""
table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options)
if table is not None:
Expand Down Expand Up @@ -322,9 +318,13 @@ def write_deltalake(
configuration=configuration,
storage_options=storage_options,
writer_properties=writer_properties,
custom_metadata=custom_metadata,
custom_metadata=commit_properties.custom_metadata
if commit_properties
else None,
post_commithook_properties=post_commithook_properties,
max_commit_retries=max_commit_retries,
max_commit_retries=commit_properties.max_commit_retries
if commit_properties
else None,
)
if table:
table.update_incremental()
Expand Down Expand Up @@ -547,7 +547,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
description,
configuration,
storage_options,
custom_metadata,
commit_properties.custom_metadata if commit_properties else None,
)
else:
table._table.create_write_transaction(
Expand All @@ -556,9 +556,13 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
partition_by or [],
schema,
partition_filters,
custom_metadata,
custom_metadata=commit_properties.custom_metadata
if commit_properties
else None,
post_commithook_properties=post_commithook_properties,
max_commit_retries=max_commit_retries,
max_commit_retries=commit_properties.max_commit_retries
if commit_properties
else None,
)
table.update_incremental()
else:
Expand Down
7 changes: 5 additions & 2 deletions python/tests/test_alter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from deltalake import DeltaTable, write_deltalake
from deltalake.exceptions import DeltaError, DeltaProtocolError
from deltalake.schema import Field, PrimitiveType, StructType
from deltalake.table import CommitProperties


def test_add_constraint(tmp_path: pathlib.Path, sample_table: pa.Table):
Expand Down Expand Up @@ -58,8 +59,9 @@ def test_add_constraint_roundtrip_metadata(

dt = DeltaTable(tmp_path)

commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"})
dt.alter.add_constraint(
{"check_price2": "price >= 0"}, custom_metadata={"userName": "John Doe"}
{"check_price2": "price >= 0"}, commit_properties=commit_properties
)

assert dt.history(1)[0]["userName"] == "John Doe"
Expand Down Expand Up @@ -112,7 +114,8 @@ def test_drop_constraint_roundtrip_metadata(
dt = DeltaTable(tmp_path)

dt.alter.add_constraint({"check_price2": "price >= 0"})
dt.alter.drop_constraint("check_price2", custom_metadata={"userName": "John Doe"})
commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"})
dt.alter.drop_constraint("check_price2", commit_properties=commit_properties)

assert dt.history(1)[0]["userName"] == "John Doe"

Expand Down
5 changes: 3 additions & 2 deletions python/tests/test_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import pyarrow.compute as pc
import pytest

from deltalake.table import DeltaTable
from deltalake.table import CommitProperties, DeltaTable
from deltalake.writer import write_deltalake


def test_delete_no_predicates(existing_table: DeltaTable):
old_version = existing_table.version()

existing_table.delete(custom_metadata={"userName": "John Doe"})
commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"})
existing_table.delete(commit_properties=commit_properties)

last_action = existing_table.history(1)[0]
assert last_action["operation"] == "DELETE"
Expand Down
4 changes: 3 additions & 1 deletion python/tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest

from deltalake import DeltaTable, write_deltalake
from deltalake.table import CommitProperties


def test_merge_when_matched_delete_wo_predicate(
Expand All @@ -20,12 +21,13 @@ def test_merge_when_matched_delete_wo_predicate(
}
)

commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"})
dt.merge(
source=source_table,
predicate="t.id = s.id",
source_alias="s",
target_alias="t",
custom_metadata={"userName": "John Doe"},
commit_properties=commit_properties,
).when_matched_delete().execute()

nrows = 4
Expand Down
9 changes: 5 additions & 4 deletions python/tests/test_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pytest

from deltalake import DeltaTable, write_deltalake
from deltalake.table import CommitProperties


@pytest.mark.parametrize("engine", ["pyarrow", "rust"])
Expand Down Expand Up @@ -39,7 +40,8 @@ def test_optimize_run_table(
old_data = dt.to_pyarrow_table()
old_version = dt.version()

dt.optimize.compact(custom_metadata={"userName": "John Doe"})
commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"})
dt.optimize.compact(commit_properties=commit_properties)

new_data = dt.to_pyarrow_table()
last_action = dt.history(1)[0]
Expand Down Expand Up @@ -70,9 +72,8 @@ def test_z_order_optimize(
dt = DeltaTable(tmp_path)
old_version = dt.version()

dt.optimize.z_order(
["date32", "timestamp"], custom_metadata={"userName": "John Doe"}
)
commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"})
dt.optimize.z_order(["date32", "timestamp"], commit_properties=commit_properties)
last_action = dt.history(1)[0]
assert last_action["operation"] == "OPTIMIZE"
assert last_action["userName"] == "John Doe"
Expand Down
4 changes: 3 additions & 1 deletion python/tests/test_repair.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os

from deltalake import DeltaTable, write_deltalake
from deltalake.table import CommitProperties


def test_repair_with_dry_run(tmp_path, sample_data):
Expand All @@ -23,7 +24,8 @@ def test_repair_wo_dry_run(tmp_path, sample_data):
dt = DeltaTable(tmp_path)
os.remove(dt.file_uris()[0])

metrics = dt.repair(dry_run=False, custom_metadata={"userName": "John Doe"})
commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"})
metrics = dt.repair(dry_run=False, commit_properties=commit_properties)
last_action = dt.history(1)[0]

assert len(metrics["files_removed"]) == 1
Expand Down
4 changes: 3 additions & 1 deletion python/tests/test_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pytest

from deltalake import DeltaTable, write_deltalake
from deltalake.table import CommitProperties


@pytest.mark.parametrize("use_relative", [True, False])
Expand All @@ -24,7 +25,8 @@ def test_restore_with_version(

dt = DeltaTable(table_path)
old_version = dt.version()
dt.restore(1, custom_metadata={"userName": "John Doe"})
commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"})
dt.restore(1, commit_properties=commit_properties)
last_action = dt.history(1)[0]
assert last_action["operation"] == "RESTORE"
assert last_action["userName"] == "John Doe"
Expand Down
4 changes: 3 additions & 1 deletion python/tests/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest

from deltalake import DeltaTable, write_deltalake
from deltalake.table import CommitProperties


@pytest.fixture()
Expand Down Expand Up @@ -38,10 +39,11 @@ def test_update_with_predicate(tmp_path: pathlib.Path, sample_table: pa.Table):
}
)

commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"})
dt.update(
updates={"deleted": "True"},
predicate="price > 3",
custom_metadata={"userName": "John Doe"},
commit_properties=commit_properties,
)

result = dt.to_pyarrow_table()
Expand Down
4 changes: 3 additions & 1 deletion python/tests/test_vacuum.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pytest

from deltalake import DeltaTable, write_deltalake
from deltalake.table import CommitProperties


def test_vacuum_dry_run_simple_table():
Expand Down Expand Up @@ -72,11 +73,12 @@ def test_vacuum_transaction_log(tmp_path: pathlib.Path, sample_data: pa.Table):

dt = DeltaTable(tmp_path)

commit_properties = CommitProperties(custom_metadata={"userName": "John Doe"})
dt.vacuum(
retention_hours=0,
dry_run=False,
enforce_retention_duration=False,
custom_metadata={"userName": "John Doe"},
commit_properties=commit_properties,
)

dt = DeltaTable(tmp_path)
Expand Down

0 comments on commit ecd2a08

Please sign in to comment.