Skip to content

Commit

Permalink
Make delta_log storage aware (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
xbrianh authored Sep 10, 2024
1 parent 5cbe30e commit 7f11b3e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 37 deletions.
4 changes: 2 additions & 2 deletions tests/test_delta_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_schema(self):
],
type="struct",
)
dlog = xdlake.read_delta_log(LOGDIR)
dlog = delta_log.DeltaLog.with_location(LOGDIR)
s = dlog.schema()
self.assertEqual(s, expected)

Expand All @@ -54,7 +54,7 @@ def test_partition_columns(self):
for expected_partition_columns in (list(), [f"{uuid4()}" for _ in range(3)]):
with self.subTest(expected=expected_partition_columns):
tc = delta_log.DeltaLogEntry.commit_overwrite_table(expected_partition_columns, [], [])
dlog = xdlake.read_delta_log(LOGDIR)
dlog = delta_log.DeltaLog.with_location(LOGDIR)
dlog.entries[len(dlog.entries)] = tc
self.assertEqual(expected_partition_columns, dlog.partition_columns())

Expand Down
35 changes: 1 addition & 34 deletions xdlake/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import re
import functools
import operator
from uuid import uuid4
Expand All @@ -13,38 +12,6 @@
from xdlake import delta_log, dataset_utils, storage, utils


_log_entry_filename_re = re.compile("^\d+\.json$")


def read_delta_log(
loc: str | storage.Location,
version: int | None = None,
storage_options: dict | None = None,
) -> delta_log.DeltaLog:
"""Read a delta table transaction log.
Args:
loc (str | Location): Root of the transaction log directory.
version (int, otional): Read log entries up to this version.
storage_options (dict, optional): keyword arguments to pass to fsspec.filesystem
Returns:
delta_log.DeltaLog
"""
loc = storage.Location.with_location(loc, storage_options=storage_options)
dlog = delta_log.DeltaLog()
if loc.exists():
for entry_loc in loc.list_files_sorted():
filename = entry_loc.basename()
if _log_entry_filename_re.match(filename):
entry_version = int(filename.split(".", 1)[0])
with entry_loc.open() as fh:
dlog[entry_version] = delta_log.DeltaLogEntry.with_handle(fh)
if version in dlog:
break
return dlog


class DeltaTable:
"""A DeltaTable is a high-level API for working with Delta Lake tables.
Expand Down Expand Up @@ -73,7 +40,7 @@ def __init__(
self.log_loc = self.loc.append_path("_delta_log")
else:
self.log_loc = storage.Location.with_location(log_loc, storage_options=storage_options)
self.dlog = read_delta_log(self.log_loc, version=version)
self.dlog = delta_log.DeltaLog.with_location(self.log_loc, version=version)
if self.dlog.entries:
self.adds = self.dlog.add_actions()
self.partition_columns = self.dlog.partition_columns()
Expand Down
35 changes: 34 additions & 1 deletion xdlake/delta_log.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
import json
import datetime
from enum import Enum
Expand All @@ -9,7 +10,7 @@

import pyarrow as pa

from xdlake import utils
from xdlake import storage, utils


CLIENT_VERSION = "xdlake-0.0.0"
Expand Down Expand Up @@ -530,6 +531,8 @@ def commit_delete_table(
class DeltaLog:
"""The transaction log of a delta table."""

_log_entry_filename_re = re.compile("^\d+\.json$")

def __init__(self):
self.entries = dict()

Expand All @@ -542,6 +545,36 @@ def __getitem__(self, key):
def __contains__(self, key):
return key in self.entries

@classmethod
def with_location(
cls,
loc: str | storage.Location,
version: int | None = None,
storage_options: dict | None = None,
) -> "DeltaLog":
"""Read a delta table transaction log.
Args:
loc (str | Location): Root of the transaction log directory.
version (int, otional): Read log entries up to this version.
storage_options (dict, optional): keyword arguments to pass to fsspec.filesystem
Returns:
delta_log.DeltaLog
"""
loc = storage.Location.with_location(loc, storage_options=storage_options)
dlog = cls()
if loc.exists():
for entry_loc in loc.list_files_sorted():
filename = entry_loc.basename()
if cls._log_entry_filename_re.match(filename):
entry_version = int(filename.split(".", 1)[0])
with entry_loc.open() as fh:
dlog[entry_version] = DeltaLogEntry.with_handle(fh)
if version in dlog:
break
return dlog

@property
def version(self) -> int:
"""The largest version in the log."""
Expand Down

0 comments on commit 7f11b3e

Please sign in to comment.