Skip to content

Commit

Permalink
Bump version to 0.9.9.3; add Redis and Sherlock dependencies, impleme…
Browse files Browse the repository at this point in the history
…nt locking mechanism in DeltaTableLoader and DeltaTableWriter
  • Loading branch information
legout committed Feb 13, 2025
1 parent c0cacd8 commit 7468e35
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 24 deletions.
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ description = "A simple workflow framework. Hamilton + APScheduler = FlowerPower
authors = [{ name = "Volker L.", email = "[email protected]" }]
readme = "README.md"
requires-python = ">= 3.11"
version = "0.9.9.2"
version = "0.9.9.3"
keywords = ["hamilton", "workflow", "pipeline", "scheduler", "apscheduler", "dask", "ray"]
dependencies = [
'aiobotocore<2.18.0',
Expand Down Expand Up @@ -43,6 +43,8 @@ io = [
'polars>=1.15.0',
'pyarrow>=18.1.0',
'pydala2>=0.9.4.5',
"redis>=5.2.1",
"sherlock>=0.4.1",
]
io-legacy = [
"connectorx>=0.4.1",
Expand All @@ -54,6 +56,8 @@ io-legacy = [
'polars-lts-cpu>=1.15.0',
'pyarrow>=18.1.0',
'pydala2>=0.9.4.5',
"redis>=5.2.1",
"sherlock>=0.4.1",
]
mongodb = ["pymongo>=4.7.2"]
mqtt = ["paho-mqtt>=2.1.0", "orjson>=3.10.11"]
Expand Down
83 changes: 81 additions & 2 deletions src/flowerpower/io/loader/deltatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

import pyarrow as pa
import pyarrow.dataset as pds
from deltalake import DeltaTable

from deltalake import DeltaTable, table
import datetime
from sherlock import RedisLock
from ..base import BaseDatasetLoader

# from hamilton.function_modifiers import dataloader
Expand All @@ -22,6 +23,8 @@ class DeltaTableLoader(BaseDatasetLoader):
"""

delta_table: DeltaTable | None = None
with_lock: bool = False
redis: str | None = None

def model_post_init(self, __context):
super().model_post_init(__context)
Expand All @@ -31,6 +34,8 @@ def model_post_init(self, __context):
self._raw_path,
storage_options=self.storage_options.to_object_store_kwargs(),
)
if self.with_lock and self.redis is None:
raise ValueError("Redis connection is required when using locks.")

@property
def dt(self) -> DeltaTable:
Expand All @@ -44,6 +49,80 @@ def to_pyarrow_table(self) -> pa.Table:
"""Converts the DeltaTable to a PyArrow Table."""
return self.delta_table.to_pyarrow_table()

def compact(
self,
partition_filters: list[tuple[str, str, any]] | None = None,
target_size: int = None,
max_concurrent_tasks: int = None,
min_commit_interval: int | datetime.timedelta | None = None,
writer_properties: table.WriterProperties = None,
custom_metadata: dict[str, str] | None = None,
post_commithook_properties: table.PostCommitHookProperties | None = None,
commit_properties: table.CommitProperties | None = None,
) -> dict[str, any]:
def _compact():
self.delta_table.compact(
partition_filters=partition_filters,
target_size=target_size,
max_concurrent_tasks=max_concurrent_tasks,
min_commit_interval=min_commit_interval,
writer_properties=writer_properties,
custom_metadata=custom_metadata,
post_commithook_properties=post_commithook_properties,
commit_properties=commit_properties,
)

if self.with_lock:
with RedisLock(
lock_name=self._raw_path,
namespace="flowerpower",
client=self.redis,
expire=10,
timeout=5,
retry_interval=0.1,
):
_compact()
else:
_compact()

def z_order(
self,
columns: list[str],
partition_filters: list[tuple[str, str, any]] | None = None,
target_size: int = None,
max_concurrent_tasks: int = None,
min_commit_interval: int | datetime.timedelta | None = None,
writer_properties: table.WriterProperties = None,
custom_metadata: dict[str, str] | None = None,
post_commithook_properties: table.PostCommitHookProperties | None = None,
commit_properties: table.CommitProperties | None = None,
) -> dict[str, any]:
def _z_order():
self.delta_table.z_order(
columns=columns,
partition_filters=partition_filters,
target_size=target_size,
max_concurrent_tasks=max_concurrent_tasks,
min_commit_interval=min_commit_interval,
writer_properties=writer_properties,
custom_metadata=custom_metadata,
post_commithook_properties=post_commithook_properties,
commit_properties=commit_properties,
)

if self.with_lock:
with RedisLock(
lock_name=self._raw_path,
namespace="flowerpower",
client=self.redis,
expire=10,
timeout=5,
retry_interval=0.1,
):
_z_order()
else:
_z_order()

# def to_polars(self, lazy: bool = True) -> pl.DataFrame | pl.LazyFrame:
# """Converts the DeltaTable to a Polars DataFrame."""
# if lazy:
Expand Down
60 changes: 40 additions & 20 deletions src/flowerpower/io/saver/deltatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import pandas as pd
import polars as pl
import pyarrow as pa
from deltalake.table import (ColumnProperties, CommitProperties,
PostCommitHookProperties)
from deltalake.table import ColumnProperties, CommitProperties, PostCommitHookProperties
from deltalake.writer import WriterProperties, write_deltalake

from ...utils.misc import _dict_to_dataframe
from ..base import BaseDatasetWriter
from sherlock import RedisLock
from redis import StrictRedis, Redis


class DeltaTableWriter(BaseDatasetWriter):
Expand All @@ -24,10 +25,14 @@ class DeltaTableWriter(BaseDatasetWriter):
"""

description: str | None = None
with_lock: bool = False
redis: StrictRedis | Redis | None = None

def model_post_init(self, __context):
super().model_post_init(__context)
self.format = "delta"
if self.with_lock and self.redis is None:
raise ValueError("Redis connection is required when using locks.")

def write(
self,
Expand Down Expand Up @@ -107,21 +112,36 @@ def write(
default_column_properties=default_column_properties,
column_properties=column_properties,
)
write_deltalake(
self._raw_path,
data,
mode=mode,
schema=schema or self.schema_,
partition_by=partition_by or self.partition_by,
storage_options=self.storage_options.to_object_store_kwargs(),
description=self.description,
schema_mode=schema_mode,
partition_filters=partition_filters,
predicate=predicate,
target_file_size=target_file_size,
large_dtypes=large_dtypes,
custom_metadata=custom_metadata,
post_commithook_properties=post_commithook_properties,
commit_properties=commit_properties,
writer_properties=writer_properties,
)

def _write():
write_deltalake(
self._raw_path,
data,
mode=mode,
schema=schema or self.schema_,
partition_by=partition_by or self.partition_by,
storage_options=self.storage_options.to_object_store_kwargs(),
description=self.description,
schema_mode=schema_mode,
partition_filters=partition_filters,
predicate=predicate,
target_file_size=target_file_size,
large_dtypes=large_dtypes,
custom_metadata=custom_metadata,
post_commithook_properties=post_commithook_properties,
commit_properties=commit_properties,
writer_properties=writer_properties,
)

if self.with_lock:
with RedisLock(
lock_name=self._raw_path,
namespace="flowerpower",
client=self.redis,
expire=10,
timeout=5,
retry_interval=0.1,
):
_write()
else:
_write()
19 changes: 18 additions & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 7468e35

Please sign in to comment.