Skip to content

Commit

Permalink
Improve transaction log commits (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
xbrianh authored Sep 10, 2024
1 parent 7f11b3e commit b0685b3
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 50 deletions.
61 changes: 18 additions & 43 deletions xdlake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,22 @@ class DeltaTable:
def __init__(
self,
loc: str | storage.Location,
log_loc: str | storage.Location | None = None,
location_or_log: str | storage.Location | delta_log.DeltaLog | None = None,
version: int | None = None,
storage_options: dict | None = None,
):
self.loc = storage.Location.with_location(loc, storage_options=storage_options)
if log_loc is None:
self.log_loc = self.loc.append_path("_delta_log")
else:
self.log_loc = storage.Location.with_location(log_loc, storage_options=storage_options)
self.dlog = delta_log.DeltaLog.with_location(self.log_loc, version=version)
match location_or_log:
case delta_log.DeltaLog():
self.dlog = location_or_log
case None | str() | storage.Location():
if location_or_log is None:
log_loc = self.loc.append_path("_delta_log")
else:
log_loc = storage.Location.with_location(location_or_log, storage_options=storage_options)
self.dlog = delta_log.DeltaLog.with_location(log_loc, version=version)
case _:
raise TypeError(f"Unexpected type for 'location_or_log': {type(location_or_log)}")
if self.dlog.entries:
self.adds = self.dlog.add_actions()
self.partition_columns = self.dlog.partition_columns()
Expand Down Expand Up @@ -70,7 +76,7 @@ def load_as_version(self, version: int) -> "DeltaTable":
Returns:
DeltaTable: A new DeltaTable instance.
"""
return type(self)(self.loc, self.log_loc, version=version)
return type(self)(self.loc, self.dlog.loc, version=version)

def add_action_to_fragment(self, add: delta_log.Add) -> tuple[storage.Location, pa.dataset.Fragment]:
"""Convert a delta log add action to a pyarrow dataset fragment.
Expand Down Expand Up @@ -165,8 +171,8 @@ def write(
ds = dataset_utils.union_dataset(data)
schema = self.dlog.evaluate_schema(ds.schema, mode, schema_mode)
new_add_actions = self.write_data(ds, partition_by, write_arrow_dataset_options)
self.write_deltalog_entry(mode, schema, new_add_actions, partition_by)
return type(self)(self.loc, self.log_loc)
entry = self.dlog.entry_for_write_mode(mode, schema, new_add_actions, partition_by)
return type(self)(self.loc, self.dlog.commit(entry))

def import_refs(
self,
Expand Down Expand Up @@ -202,8 +208,8 @@ def import_refs(
new_add_actions = list()
for child_ds in ds.children:
new_add_actions.extend(self.add_actions_for_foreign_dataset(child_ds))
self.write_deltalog_entry(mode, schema, new_add_actions, partition_by)
return type(self)(self.loc, self.log_loc)
entry = self.dlog.entry_for_write_mode(mode, schema, new_add_actions, partition_by)
return type(self)(self.loc, self.dlog.commit(entry))

def clone(self, dst_loc: str | storage.Location, dst_log_loc: str | None = None) -> "DeltaTable":
"""Clone the DeltaTable
Expand Down Expand Up @@ -282,9 +288,7 @@ def delete(self, where: pc.Expression, write_arrow_dataset_options: dict | None
num_copied_rows=num_copied_rows,
num_deleted_rows=num_deleted_rows,
)
with self.log_loc.append_path(utils.filename_for_version(self._version_to_write)).open(mode="w") as fh:
new_entry.write(fh)
return type(self)(self.loc, self.log_loc)
return type(self)(self.loc, self.dlog.commit(new_entry))

def write_data(
self,
Expand Down Expand Up @@ -375,32 +379,3 @@ def add_actions_for_foreign_dataset(self, ds: pa.dataset.FileSystemDataset) -> l
)

return add_actions

def write_deltalog_entry(
self,
mode,
schema: delta_log.Schema,
add_actions: list[delta_log.Add],
partition_by: list | None = None,
):
"""Write a new delta log entry.
Args:
mode (WriteMode): Write mode.
schema (delta_log.Schema): Schema.
add_actions (list[delta_log.Add]): Add actions.
partition_by (list[str], optional): Partition
"""
partition_by = partition_by or list()
new_entry = delta_log.DeltaLogEntry()
if 0 == self._version_to_write:
new_entry = delta_log.DeltaLogEntry.commit_create_table(self.log_loc.path, schema, partition_by, add_actions)
self.log_loc.mkdir()
elif delta_log.WriteMode.append == mode:
new_entry = delta_log.DeltaLogEntry.commit_append_table(partition_by, add_actions, schema)
elif delta_log.WriteMode.overwrite == mode:
existing_add_actions = self.dlog.add_actions().values()
new_entry = delta_log.DeltaLogEntry.commit_overwrite_table(partition_by, existing_add_actions, add_actions)

with self.log_loc.append_path(utils.filename_for_version(self._version_to_write)).open(mode="w") as fh:
new_entry.write(fh)
54 changes: 47 additions & 7 deletions xdlake/delta_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,9 @@ class DeltaLog:

_log_entry_filename_re = re.compile("^\d+\.json$")

def __init__(self):
self.entries = dict()
def __init__(self, location: storage.Location):
self.entries: dict[int, DeltaLogEntry] = dict()
self.loc = location

def __setitem__(self, key, val):
self.entries[key] = val
Expand Down Expand Up @@ -562,10 +563,9 @@ def with_location(
Returns:
delta_log.DeltaLog
"""
loc = storage.Location.with_location(loc, storage_options=storage_options)
dlog = cls()
if loc.exists():
for entry_loc in loc.list_files_sorted():
dlog = cls(storage.Location.with_location(loc, storage_options=storage_options))
if dlog.loc.exists():
for entry_loc in dlog.loc.list_files_sorted():
filename = entry_loc.basename()
if cls._log_entry_filename_re.match(filename):
entry_version = int(filename.split(".", 1)[0])
Expand All @@ -587,6 +587,13 @@ def versions(self) -> list[int]:
return sorted(self.entries.keys())
else:
raise ValueError("This delta log is empty!")
@property
def version_to_write(self) -> int:
"""The next log version."""
try:
return 1 + self.versions[-1]
except ValueError:
return 0

def schema(self) -> Schema:
"""The latest schema in the log."""
Expand All @@ -609,7 +616,7 @@ def add_actions(self) -> dict[str, Add]:

def partition_columns(self) -> list:
"""The partition columns of the latest version."""
cols = list()
cols: list | None = list()
for v in sorted(self.entries.keys(), reverse=True):
cols = self.entries[v].partition_columns()
if cols is not None:
Expand Down Expand Up @@ -655,6 +662,39 @@ def evaluate_schema(self, pyarrow_schema: pa.Schema, write_mode: WriteMode, sche
raise ValueError("Schema mismatch")
return schema

def entry_for_write_mode(
self,
mode,
schema: Schema,
add_actions: list[Add],
partition_by: list | None = None,
):
"""Write a new delta log entry.
Args:
mode (WriteMode): Write mode.
schema (Schema): Schema.
add_actions (list[Add]): Add actions.
partition_by (list[str], optional): Partition
"""
partition_by = partition_by or list()
entry = DeltaLogEntry()
if 0 == self.version_to_write:
entry = DeltaLogEntry.commit_create_table(self.loc.path, schema, partition_by, add_actions)
elif WriteMode.append == mode:
entry = DeltaLogEntry.commit_append_table(partition_by, add_actions, schema)
elif WriteMode.overwrite == mode:
existing_add_actions = self.add_actions().values()
entry = DeltaLogEntry.commit_overwrite_table(partition_by, existing_add_actions, add_actions)
return entry

def commit(self, entry: DeltaLogEntry) -> "DeltaLog":
if 0 == self.version_to_write:
self.loc.mkdir()
with self.loc.append_path(utils.filename_for_version(self.version_to_write)).open(mode="w") as fh:
entry.write(fh)
return type(self).with_location(self.loc)


def generate_remove_acctions(add_actions: Iterable[Add]) -> list[Remove]:
"""Generate remove actions from add actions.
Expand Down

0 comments on commit b0685b3

Please sign in to comment.