Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(python)!: simplify marshalling of Fragment, DataFile, Operation, Transaction #3240

Merged
merged 9 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 36 additions & 70 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import time
import uuid
import warnings
from abc import ABC, abstractmethod
from abc import ABC
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
Expand All @@ -26,6 +26,7 @@
Optional,
Sequence,
Set,
Tuple,
TypedDict,
Union,
)
Expand All @@ -49,9 +50,6 @@
CleanupStats,
_Dataset,
_MergeInsertBuilder,
_Operation,
_RewriteGroup,
_RewrittenIndex,
_Scanner,
_write_dataset,
)
Expand Down Expand Up @@ -105,9 +103,35 @@ def execute(self, data_obj: ReaderLike, *, schema: Optional[pa.Schema] = None):

return super(MergeInsertBuilder, self).execute(reader)

def execute_uncommitted(
self, data_obj: ReaderLike, *, schema: Optional[pa.Schema] = None
) -> Tuple[Transaction, Dict[str, Any]]:
"""Executes the merge insert operation without committing

This function updates the original dataset and returns a dictionary with
information about merge statistics - i.e. the number of inserted, updated,
and deleted rows.

Parameters
----------

data_obj: ReaderLike
The new data to use as the source table for the operation. This parameter
can be any source of data (e.g. table / dataset) that
:func:`~lance.write_dataset` accepts.
schema: Optional[pa.Schema]
The schema of the data. This only needs to be supplied whenever the data
source is some kind of generator.
"""
reader = _coerce_reader(data_obj, schema)

return super(MergeInsertBuilder, self).execute_uncommitted(reader)

# These next three overrides exist only to document the methods

def when_matched_update_all(self, condition: Optional[str] = None):
def when_matched_update_all(
self, condition: Optional[str] = None
) -> "MergeInsertBuilder":
"""
Configure the operation to update matched rows

Expand All @@ -128,7 +152,7 @@ def when_matched_update_all(self, condition: Optional[str] = None):
"""
return super(MergeInsertBuilder, self).when_matched_update_all(condition)

def when_not_matched_insert_all(self):
def when_not_matched_insert_all(self) -> "MergeInsertBuilder":
"""
Configure the operation to insert not matched rows

Expand All @@ -138,7 +162,9 @@ def when_not_matched_insert_all(self):
"""
return super(MergeInsertBuilder, self).when_not_matched_insert_all()

def when_not_matched_by_source_delete(self, expr: Optional[str] = None):
def when_not_matched_by_source_delete(
self, expr: Optional[str] = None
) -> "MergeInsertBuilder":
"""
Configure the operation to delete source rows that do not match

Expand Down Expand Up @@ -2216,7 +2242,7 @@ def commit(

new_ds = _Dataset.commit(
base_uri,
operation._to_inner(),
operation,
read_version,
commit_lock,
storage_options=storage_options,
Expand Down Expand Up @@ -2315,19 +2341,6 @@ def commit_batch(
detached=detached,
max_retries=max_retries,
)
merged = Transaction(**merged)
# This logic is specific to append, which is all that should
# be returned here.
# TODO: generalize this to all other transaction types.
merged.operation["fragments"] = [
FragmentMetadata.from_metadata(f) for f in merged.operation["fragments"]
]
merged.operation = LanceOperation.Append(**merged.operation)
if merged.blobs_op:
merged.blobs_op["fragments"] = [
FragmentMetadata.from_metadata(f) for f in merged.blobs_op["fragments"]
]
merged.blobs_op = LanceOperation.Append(**merged.blobs_op)
Comment on lines -2318 to -2330
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the messy marshaling logic that made me want this refactor. As I extend to other transaction types, this would have become completely unmanageable.

ds = LanceDataset.__new__(LanceDataset)
ds._ds = new_ds
ds._uri = new_ds.uri
Expand Down Expand Up @@ -2413,10 +2426,6 @@ class BaseOperation(ABC):
See available operations under :class:`LanceOperation`.
"""

@abstractmethod
def _to_inner(self):
raise NotImplementedError()

@dataclass
class Overwrite(BaseOperation):
"""
Expand Down Expand Up @@ -2460,7 +2469,7 @@ class Overwrite(BaseOperation):
3 4 d
"""

new_schema: pa.Schema
new_schema: LanceSchema | pa.Schema
fragments: Iterable[FragmentMetadata]

def __post_init__(self):
Expand All @@ -2470,10 +2479,6 @@ def __post_init__(self):
)
LanceOperation._validate_fragments(self.fragments)

def _to_inner(self):
raw_fragments = [f._metadata for f in self.fragments]
return _Operation.overwrite(self.new_schema, raw_fragments)

@dataclass
class Append(BaseOperation):
"""
Expand Down Expand Up @@ -2520,10 +2525,6 @@ class Append(BaseOperation):
def __post_init__(self):
LanceOperation._validate_fragments(self.fragments)

def _to_inner(self):
raw_fragments = [f._metadata for f in self.fragments]
return _Operation.append(raw_fragments)

@dataclass
class Delete(BaseOperation):
"""
Expand Down Expand Up @@ -2592,12 +2593,6 @@ class Delete(BaseOperation):
def __post_init__(self):
LanceOperation._validate_fragments(self.updated_fragments)

def _to_inner(self):
raw_updated_fragments = [f._metadata for f in self.updated_fragments]
return _Operation.delete(
raw_updated_fragments, self.deleted_fragment_ids, self.predicate
)

@dataclass
class Merge(BaseOperation):
"""
Expand Down Expand Up @@ -2658,18 +2653,14 @@ class Merge(BaseOperation):
schema: LanceSchema | pa.Schema

def __post_init__(self):
LanceOperation._validate_fragments(self.fragments)

def _to_inner(self):
raw_fragments = [f._metadata for f in self.fragments]
if isinstance(self.schema, pa.Schema):
warnings.warn(
"Passing a pyarrow.Schema to Merge is deprecated. "
"Please use a LanceSchema instead.",
DeprecationWarning,
)
self.schema = LanceSchema.from_pyarrow(self.schema)
return _Operation.merge(raw_fragments, self.schema)
LanceOperation._validate_fragments(self.fragments)

@dataclass
class Restore(BaseOperation):
Expand All @@ -2679,9 +2670,6 @@ class Restore(BaseOperation):

version: int

def _to_inner(self):
return _Operation.restore(self.version)

@dataclass
class RewriteGroup:
"""
Expand All @@ -2691,11 +2679,6 @@ class RewriteGroup:
old_fragments: Iterable[FragmentMetadata]
new_fragments: Iterable[FragmentMetadata]

def _to_inner(self):
old_fragments = [f._metadata for f in self.old_fragments]
new_fragments = [f._metadata for f in self.new_fragments]
return _RewriteGroup(old_fragments, new_fragments)

@dataclass
class RewrittenIndex:
"""
Expand All @@ -2705,9 +2688,6 @@ class RewrittenIndex:
old_id: str
new_id: str

def _to_inner(self):
return _RewrittenIndex(self.old_id, self.new_id)

@dataclass
class Rewrite(BaseOperation):
"""
Expand All @@ -2734,11 +2714,6 @@ def __post_init__(self):
all_frags += [new for group in self.groups for new in group.new_fragments]
LanceOperation._validate_fragments(all_frags)

def _to_inner(self):
groups = [group._to_inner() for group in self.groups]
rewritten_indices = [index._to_inner() for index in self.rewritten_indices]
return _Operation.rewrite(groups, rewritten_indices)

@dataclass
class CreateIndex(BaseOperation):
"""
Expand All @@ -2751,15 +2726,6 @@ class CreateIndex(BaseOperation):
dataset_version: int
fragment_ids: Set[int]

def _to_inner(self):
return _Operation.create_index(
self.uuid,
self.name,
self.fields,
self.dataset_version,
self.fragment_ids,
)


class ScannerBuilder:
def __init__(self, ds: LanceDataset):
Expand Down
Loading
Loading