-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Python: TableScan Plan files API implementation without residual evaluation #6069
Changes from all commits
7e30879
79770f7
65bb08b
da71c2b
998003e
30773a5
63ada10
f8a5a93
970341e
d92bf6b
c508250
51babdd
fd04698
736509a
aa2ec55
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -14,9 +14,11 @@ | |||||
# KIND, either express or implied. See the License for the | ||||||
# specific language governing permissions and limitations | ||||||
# under the License. | ||||||
from __future__ import annotations | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a community, we probably should make a decision on this one. Sometimes we use it, but not all the time. Personally I find There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like Optional[str] too, but I think for some reason when I run There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is because the import statement. PyUpgrade will automatically rewrite the imports: https://github.com/asottile/pyupgrade#pep-604-typing-rewrites We can change this in another PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've reverted to not use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should have been fixed now #6114 is in. Delayed loading of the annotations (that's what the import does), can be helpful sometime :) |
||||||
|
||||||
import json | ||||||
from abc import ABC, abstractmethod | ||||||
from typing import Any, List, Optional | ||||||
from typing import Any, Iterable | ||||||
from uuid import UUID | ||||||
|
||||||
from rich.console import Console | ||||||
|
@@ -27,6 +29,7 @@ | |||||
from pyiceberg.schema import Schema | ||||||
from pyiceberg.table import Table | ||||||
from pyiceberg.table.partitioning import PartitionSpec | ||||||
from pyiceberg.table.scan import FileScanTask | ||||||
from pyiceberg.typedef import Identifier, Properties | ||||||
|
||||||
|
||||||
|
@@ -49,6 +52,10 @@ def describe_table(self, table: Table) -> None: | |||||
def files(self, table: Table, io: FileIO, history: bool) -> None: | ||||||
... | ||||||
|
||||||
@abstractmethod | ||||||
def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: Optional[int] = None) -> None: | ||||||
... | ||||||
|
||||||
@abstractmethod | ||||||
def describe_properties(self, properties: Properties) -> None: | ||||||
... | ||||||
|
@@ -88,7 +95,7 @@ def exception(self, ex: Exception) -> None: | |||||
else: | ||||||
Console(stderr=True).print(ex) | ||||||
|
||||||
def identifiers(self, identifiers: List[Identifier]) -> None: | ||||||
def identifiers(self, identifiers: list[Identifier]) -> None: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
table = self._table | ||||||
for identifier in identifiers: | ||||||
table.add_row(".".join(identifier)) | ||||||
|
@@ -146,6 +153,19 @@ def files(self, table: Table, io: FileIO, history: bool) -> None: | |||||
manifest_tree.add(f"Datafile: {manifest_entry.data_file.file_path}") | ||||||
Console().print(snapshot_tree) | ||||||
|
||||||
def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: int | None = None) -> None: | ||||||
snapshot_tree = Tree(f"Snapshot: {snapshot_id}") | ||||||
|
||||||
manifest_dict = {} | ||||||
for file in plan_files: | ||||||
if file.manifest.manifest_path not in manifest_dict: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Should we move |
||||||
manifest_tree = snapshot_tree.add(f"Manifest: {file.manifest.manifest_path}") | ||||||
manifest_dict[file.manifest.manifest_path] = manifest_tree | ||||||
|
||||||
manifest_dict[file.manifest.manifest_path].add(f"Data File: {file.data_file.file_path}") | ||||||
|
||||||
Console().print(snapshot_tree) | ||||||
|
||||||
def describe_properties(self, properties: Properties) -> None: | ||||||
output_table = self._table | ||||||
for k, v in properties.items(): | ||||||
|
@@ -164,7 +184,7 @@ def schema(self, schema: Schema) -> None: | |||||
def spec(self, spec: PartitionSpec) -> None: | ||||||
Console().print(str(spec)) | ||||||
|
||||||
def uuid(self, uuid: Optional[UUID]) -> None: | ||||||
def uuid(self, uuid: UUID | None) -> None: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
Console().print(str(uuid) if uuid else "missing") | ||||||
|
||||||
|
||||||
|
@@ -182,7 +202,7 @@ def _out(self, d: Any) -> None: | |||||
def exception(self, ex: Exception) -> None: | ||||||
self._out({"type": ex.__class__.__name__, "message": str(ex)}) | ||||||
|
||||||
def identifiers(self, identifiers: List[Identifier]) -> None: | ||||||
def identifiers(self, identifiers: list[Identifier]) -> None: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
self._out([".".join(identifier) for identifier in identifiers]) | ||||||
|
||||||
def describe_table(self, table: Table) -> None: | ||||||
|
@@ -200,8 +220,11 @@ def schema(self, schema: Schema) -> None: | |||||
def files(self, table: Table, io: FileIO, history: bool) -> None: | ||||||
pass | ||||||
|
||||||
def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: int | None = None) -> None: | ||||||
pass | ||||||
|
||||||
def spec(self, spec: PartitionSpec) -> None: | ||||||
print(spec.json()) | ||||||
|
||||||
def uuid(self, uuid: Optional[UUID]) -> None: | ||||||
def uuid(self, uuid: UUID | None) -> None: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
self._out({"uuid": str(uuid) if uuid else "missing"}) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -436,8 +436,10 @@ def __init__(self, partition_struct_schema: Schema, partition_filter: BooleanExp | |
self.partition_filter = bind(partition_struct_schema, rewrite_not(partition_filter), case_sensitive) | ||
|
||
def eval(self, manifest: ManifestFile) -> bool: | ||
print(f"Evaluating ManifestFile = {manifest} with partition_filter={self.partition_filter}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is still around from the debugging? Maybe convert it into a log statement? |
||
if partitions := manifest.partitions: | ||
self.partition_fields = partitions | ||
print(f"self.partition_fields = {self.partition_fields }") | ||
return visit(self.partition_filter, self) | ||
|
||
# No partition information | ||
|
@@ -517,7 +519,7 @@ def visit_equal(self, term: BoundTerm, literal: Literal[Any]) -> bool: | |
pos = term.ref().accessor.position | ||
field = self.partition_fields[pos] | ||
|
||
if field.lower_bound is None: | ||
if field.lower_bound is None or field.upper_bound is None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great catch! Thanks! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If one is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is required for the linter since we're the variable below. |
||
# values are all null and literal cannot contain null | ||
return ROWS_CANNOT_MATCH | ||
|
||
|
@@ -526,7 +528,7 @@ def visit_equal(self, term: BoundTerm, literal: Literal[Any]) -> bool: | |
if lower > literal.value: | ||
return ROWS_CANNOT_MATCH | ||
|
||
upper = _from_byte_buffer(term.ref().field.field_type, field.lower_bound) | ||
upper = _from_byte_buffer(term.ref().field.field_type, field.upper_bound) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Fokko, can you check why tests didn't catch this case? We should ensure this is tested properly and separate the fix into its own PR. |
||
|
||
if literal.value > upper: | ||
return ROWS_CANNOT_MATCH | ||
|
@@ -613,6 +615,9 @@ def visit_or(self, left_result: bool, right_result: bool) -> bool: | |
def manifest_evaluator( | ||
partition_spec: PartitionSpec, schema: Schema, partition_filter: BooleanExpression, case_sensitive: bool = True | ||
) -> Callable[[ManifestFile], bool]: | ||
partition_schema = Schema(*partition_spec.partition_type(schema)) | ||
partition_type = partition_spec.partition_type(schema) | ||
partition_schema = Schema(*partition_type.fields) | ||
partition_filter = partition_filter or AlwaysTrue() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since |
||
print(f"partition_schema= {partition_schema} partition_filter={partition_filter}") | ||
evaluator = _ManifestEvalVisitor(partition_schema, partition_filter, case_sensitive) | ||
return evaluator.eval |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
import itertools | ||
from abc import ABC, abstractmethod | ||
from typing import Iterable, List, Optional | ||
|
||
from pydantic import Field | ||
|
||
from pyiceberg.expressions import AlwaysTrue, BooleanExpression | ||
from pyiceberg.expressions.visitors import manifest_evaluator | ||
from pyiceberg.io import FileIO | ||
from pyiceberg.manifest import DataFile, ManifestFile | ||
from pyiceberg.table import PartitionSpec, Snapshot, Table | ||
from pyiceberg.utils.iceberg_base_model import IcebergBaseModel | ||
|
||
ALWAYS_TRUE = AlwaysTrue() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer not adding these constants everywhere. Just use |
||
|
||
|
||
class FileScanTask(IcebergBaseModel): | ||
"""A scan task over a range of bytes in a single data file.""" | ||
|
||
manifest: ManifestFile = Field() | ||
data_file: DataFile = Field() | ||
_residual: BooleanExpression = Field() | ||
spec: PartitionSpec = Field() | ||
start: int = Field(default=0) | ||
|
||
@property | ||
def length(self) -> int: | ||
return self.data_file.file_size_in_bytes | ||
|
||
|
||
class TableScan(ABC): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this API not match the Java API? Is there something more pythonic about this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just wanted to build the API incrementally and add things as and when needed, otherwise it becomes a bloated PR. |
||
"""API for configuring a table scan.""" | ||
|
||
table: Table | ||
snapshot: Snapshot | ||
expression: BooleanExpression | ||
|
||
def __init__(self, table: Table, snapshot: Optional[Snapshot] = None, expression: BooleanExpression = ALWAYS_TRUE): | ||
self.table = table | ||
self.expression = expression or ALWAYS_TRUE | ||
if resolved_snapshot := snapshot or table.current_snapshot(): | ||
self.snapshot = resolved_snapshot | ||
else: | ||
raise ValueError("Unable to resolve to a Snapshot to use for the table scan.") | ||
|
||
@abstractmethod | ||
def plan_files(self) -> Iterable[FileScanTask]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In Java, there are several different types of tasks. I'd recommend using the same pattern and not requiring this to be |
||
"""Plan tasks for this scan where each task reads a single file. | ||
|
||
Returns: | ||
Table: a tuple of tasks scanning entire files required by this scan | ||
""" | ||
|
||
|
||
class DataTableScan(TableScan): | ||
"""API for configuring a table scan.""" | ||
|
||
io: FileIO | ||
|
||
def __init__( | ||
self, io: FileIO, table: Table, snapshot: Optional[Snapshot] = None, expression: Optional[BooleanExpression] = ALWAYS_TRUE | ||
): | ||
self.io = io | ||
super().__init__(table, snapshot, expression) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Convention is to first call the super, and then do the rest There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will make the change. |
||
|
||
def plan_files(self) -> Iterable[FileScanTask]: | ||
matching_manifests = [ | ||
manifest | ||
for manifest in self.snapshot.fetch_manifest_list(self.io) | ||
if manifest_evaluator( | ||
self.table.specs()[manifest.partition_spec_id], | ||
self.table.schemas()[self.snapshot.schema_id] if self.snapshot.schema_id is not None else self.table.schema(), | ||
self.expression, | ||
)(manifest) | ||
] | ||
|
||
return itertools.chain.from_iterable( | ||
[self._fetch_file_scan_tasks_for_manifest(manifest) for manifest in matching_manifests] | ||
) | ||
|
||
def _fetch_file_scan_tasks_for_manifest(self, manifest: ManifestFile) -> List[FileScanTask]: | ||
manifest_entries = manifest.fetch_manifest_entry(self.io) | ||
data_files = [entry.data_file for entry in manifest_entries] | ||
|
||
spec = self.table.specs().get(manifest.partition_spec_id) | ||
# Row level filters to be implemented. Need projections for evaluating residual. Skipping for the time being. | ||
return [FileScanTask(manifest=manifest, data_file=file, spec=spec, residual=None) for file in data_files] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that we can easily visualize the files. I think we should merge this one with the files command since they are so similar. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I follow you @Fokko . Do you mean make
scan
a sub-command offiles
command? Or just havefiles
command use thetable.new_scan().plan_files()
API underneath?If it's the later, I like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the later one is more flexible, but then we should remove the files because they have so much overlap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only difference being
files
at present lists all snapshots. The table scanplan_files()
will only list the snapshot that is being used for the scan.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can make files with three options:
files --snapshot=all
- The current files behaviorfiles --snapshot=current
- Files under the current snapshotfiles --snapshot=<snapshot_id>
- Files under the snapshot_id specifiedThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not move it to the scan subcommand? This way we don't have similar commands