Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: TableScan Plan files API implementation without residual evaluation #6069

Closed
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions python/pyiceberg/cli/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,3 +361,24 @@ def table(ctx: Context, identifier: str, property_name: str): # noqa: F811
ctx.exit(1)
else:
raise NoSuchPropertyException(f"Property {property_name} does not exist on {identifier}")


@run.group()
def scan():
"""Create a table scan."""


@scan.command("table")
@click.argument("identifier")
@click.pass_context
@catch_exception()
def scan_table(ctx: Context, identifier: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that we can easily visualize the files. I think we should merge this one with the files command since they are so similar. WDYT?

➜  python git:(master) ✗ pyiceberg --verbose true --catalog fokko files hive_test.h1
Snapshots: fokko.hive_test.h1
└── Snapshot 4362649644256929116, schema 0: 
    s3://bucket/41246f47-e1bc-465e-89b4-6d039a9d55dc/67ad77ac-d90b-4472-b1e9-672d91a1ac41/metadata/snap-4362649644256929116-1-63e88d4d-1b30-4cae-b7a0-f53b0b54758a.avro
    ├── Manifest: s3://bucket/41246f47-e1bc-465e-89b4-6d039a9d55dc/67ad77ac-d90b-4472-b1e9-672d91a1ac41/metadata/63e88d4d-1b30-4cae-b7a0-f53b0b54758a-m0.avro
    │   └── Datafile: 
    │       s3://bucket/41246f47-e1bc-465e-89b4-6d039a9d55dc/67ad77ac-d90b-4472-b1e9-672d91a1ac41/data/0bfca5ff//00000-0-dweeks_20220901095219_f181f97a-43ce-4aa6-8cec-6956d83b7a42-job_lo
    │       cal2002913683_0002-00001.parquet
    └── Manifest: s3://bucket/41246f47-e1bc-465e-89b4-6d039a9d55dc/67ad77ac-d90b-4472-b1e9-672d91a1ac41/metadata/508fd9ac-520f-47d8-900f-00f1dda051ac-m0.avro
        └── Datafile: s3://bucket/41246f47-e1bc-465e-89b4-6d039a9d55dc/67ad77ac-d90b-4472-b1e9-672d91a1ac41/data/3d030bcf/00000-0-44897130-c494-4dca-a164-e874c1e37c36-00001.parquet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow you @Fokko . Do you mean make scan a sub-command of files command? Or just have files command use the table.new_scan().plan_files() API underneath?

If it's the later, I like that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the later one is more flexible, but then we should remove the files because they have so much overlap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only difference being files at present lists all snapshots. The table scan plan_files() will only list the snapshot that is being used for the scan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can make files with three options:

  • files --snapshot=all - The current files behavior
  • files --snapshot=current - Files under the current snapshot
  • files --snapshot=<snapshot_id> - Files under the snapshot_id specified

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not move it to the scan subcommand? This way we don't have similar commands

"""Lists all the data files that will be scanned as part of the table scan plan."""
catalog, output = _catalog_and_output(ctx)

catalog_table = catalog.load_table(identifier)
io = load_file_io({**catalog.properties, **catalog_table.metadata.properties})
file_scan_tasks = catalog_table.new_scan(io).plan_files()
snapshot = catalog_table.current_snapshot()
snapshot_id = snapshot.snapshot_id if snapshot is not None else None
output.scan_plan_files(file_scan_tasks, snapshot_id)
37 changes: 30 additions & 7 deletions python/pyiceberg/cli/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a community, we probably should make a decision on this one. Sometimes we use it, but not all the time. Personally I find Optional[str] easier to read than str | None

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like Optional[str] too, but I think for some reason when I run make lint locally it turns it into the later format i.e. str | None Do we need to tweak the linter config to enforce one format over the other?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because the import statement. PyUpgrade will automatically rewrite the imports: https://github.com/asottile/pyupgrade#pep-604-typing-rewrites We can change this in another PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reverted to not use from __future__ import annotations in a recent commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should have been fixed now #6114 is in. Delayed loading of the annotations (that's what the import does), can be helpful sometime :)


import json
from abc import ABC, abstractmethod
from typing import Any, List, Optional
from typing import Any, Iterable
from uuid import UUID

from rich.console import Console
Expand All @@ -27,6 +29,7 @@
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table.partitioning import PartitionSpec
from pyiceberg.table.scan import FileScanTask
from pyiceberg.typedef import Identifier, Properties


Expand All @@ -38,7 +41,7 @@ def exception(self, ex: Exception) -> None:
...

@abstractmethod
def identifiers(self, identifiers: List[Identifier]) -> None:
def identifiers(self, identifiers: list[Identifier]) -> None:
dhruv-pratap marked this conversation as resolved.
Show resolved Hide resolved
...

@abstractmethod
Expand All @@ -49,6 +52,10 @@ def describe_table(self, table: Table) -> None:
def files(self, table: Table, io: FileIO, history: bool) -> None:
...

@abstractmethod
def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: int | None = None) -> None:
dhruv-pratap marked this conversation as resolved.
Show resolved Hide resolved
...

@abstractmethod
def describe_properties(self, properties: Properties) -> None:
...
Expand All @@ -66,7 +73,7 @@ def spec(self, spec: PartitionSpec) -> None:
...

@abstractmethod
def uuid(self, uuid: Optional[UUID]) -> None:
def uuid(self, uuid: UUID | None) -> None:
dhruv-pratap marked this conversation as resolved.
Show resolved Hide resolved
...


Expand All @@ -88,7 +95,7 @@ def exception(self, ex: Exception) -> None:
else:
Console(stderr=True).print(ex)

def identifiers(self, identifiers: List[Identifier]) -> None:
def identifiers(self, identifiers: list[Identifier]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def identifiers(self, identifiers: list[Identifier]) -> None:
def identifiers(self, identifiers: List[Identifier]) -> None:

table = self._table
for identifier in identifiers:
table.add_row(".".join(identifier))
Expand Down Expand Up @@ -146,6 +153,19 @@ def files(self, table: Table, io: FileIO, history: bool) -> None:
manifest_tree.add(f"Datafile: {manifest_entry.data_file.file_path}")
Console().print(snapshot_tree)

def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: int | None = None) -> None:
snapshot_tree = Tree(f"Snapshot: {snapshot_id}")

manifest_dict = {}
for file in plan_files:
if file.manifest.manifest_path not in manifest_dict:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Should we move file.manifest.manifest_path in a local variable, since we use it three times?

manifest_tree = snapshot_tree.add(f"Manifest: {file.manifest.manifest_path}")
manifest_dict[file.manifest.manifest_path] = manifest_tree

manifest_dict[file.manifest.manifest_path].add(f"Data File: {file.data_file.file_path}")

Console().print(snapshot_tree)

def describe_properties(self, properties: Properties) -> None:
output_table = self._table
for k, v in properties.items():
Expand All @@ -164,7 +184,7 @@ def schema(self, schema: Schema) -> None:
def spec(self, spec: PartitionSpec) -> None:
Console().print(str(spec))

def uuid(self, uuid: Optional[UUID]) -> None:
def uuid(self, uuid: UUID | None) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def uuid(self, uuid: UUID | None) -> None:
def uuid(self, uuid: Optional[UUID]) -> None:

Console().print(str(uuid) if uuid else "missing")


Expand All @@ -182,7 +202,7 @@ def _out(self, d: Any) -> None:
def exception(self, ex: Exception) -> None:
self._out({"type": ex.__class__.__name__, "message": str(ex)})

def identifiers(self, identifiers: List[Identifier]) -> None:
def identifiers(self, identifiers: list[Identifier]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def identifiers(self, identifiers: list[Identifier]) -> None:
def identifiers(self, identifiers: List[Identifier]) -> None:

self._out([".".join(identifier) for identifier in identifiers])

def describe_table(self, table: Table) -> None:
Expand All @@ -200,8 +220,11 @@ def schema(self, schema: Schema) -> None:
def files(self, table: Table, io: FileIO, history: bool) -> None:
pass

def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: int | None = None) -> None:
pass

def spec(self, spec: PartitionSpec) -> None:
print(spec.json())

def uuid(self, uuid: Optional[UUID]) -> None:
def uuid(self, uuid: UUID | None) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def uuid(self, uuid: UUID | None) -> None:
def uuid(self, uuid: Optional[UUID]) -> None:

self._out({"uuid": str(uuid) if uuid else "missing"})
11 changes: 8 additions & 3 deletions python/pyiceberg/expressions/visitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,10 @@ def __init__(self, partition_struct_schema: Schema, partition_filter: BooleanExp
self.partition_filter = bind(partition_struct_schema, rewrite_not(partition_filter), case_sensitive)

def eval(self, manifest: ManifestFile) -> bool:
print(f"Evaluating ManifestFile = {manifest} with partition_filter={self.partition_filter}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is still around from the debugging? Maybe convert it into a log statement?

if partitions := manifest.partitions:
self.partition_fields = partitions
print(f"self.partition_fields = {self.partition_fields }")
return visit(self.partition_filter, self)

# No partition information
Expand Down Expand Up @@ -517,7 +519,7 @@ def visit_equal(self, term: BoundTerm, literal: Literal[Any]) -> bool:
pos = term.ref().accessor.position
field = self.partition_fields[pos]

if field.lower_bound is None:
if field.lower_bound is None or field.upper_bound is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one is None then both should be, so this shouldn't have any effect. But it's fine to update this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required for the linter since we're the variable below.

# values are all null and literal cannot contain null
return ROWS_CANNOT_MATCH

Expand All @@ -526,7 +528,7 @@ def visit_equal(self, term: BoundTerm, literal: Literal[Any]) -> bool:
if lower > literal.value:
return ROWS_CANNOT_MATCH

upper = _from_byte_buffer(term.ref().field.field_type, field.lower_bound)
upper = _from_byte_buffer(term.ref().field.field_type, field.upper_bound)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko, can you check why tests didn't catch this case? We should ensure this is tested properly and separate the fix into its own PR.


if literal.value > upper:
return ROWS_CANNOT_MATCH
Expand Down Expand Up @@ -613,6 +615,9 @@ def visit_or(self, left_result: bool, right_result: bool) -> bool:
def manifest_evaluator(
partition_spec: PartitionSpec, schema: Schema, partition_filter: BooleanExpression, case_sensitive: bool = True
) -> Callable[[ManifestFile], bool]:
partition_schema = Schema(*partition_spec.partition_type(schema))
partition_type = partition_spec.partition_type(schema)
partition_schema = Schema(*partition_type.fields)
partition_filter = partition_filter or AlwaysTrue()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since partition_filter cannot be None according to the type, this statement doesn't have any effect

print(f"partition_schema= {partition_schema} partition_filter={partition_filter}")
evaluator = _ManifestEvalVisitor(partition_schema, partition_filter, case_sensitive)
return evaluator.eval
18 changes: 16 additions & 2 deletions python/pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


from typing import Dict, List, Optional

from pydantic import Field

from pyiceberg.expressions import AlwaysTrue, BooleanExpression
from pyiceberg.io import FileIO
from pyiceberg.schema import Schema
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.partitioning import PartitionSpec
Expand All @@ -28,6 +28,8 @@
from pyiceberg.typedef import Identifier
from pyiceberg.utils.iceberg_base_model import IcebergBaseModel

ALWAYS_TRUE = AlwaysTrue()


class Table(IcebergBaseModel):
identifier: Identifier = Field()
Expand Down Expand Up @@ -90,3 +92,15 @@ def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
def history(self) -> List[SnapshotLogEntry]:
"""Get the snapshot history of this table."""
return self.metadata.snapshot_log

def new_scan(self, io: FileIO, snapshot_id: Optional[int] = None, expression: Optional[BooleanExpression] = ALWAYS_TRUE):
rdblue marked this conversation as resolved.
Show resolved Hide resolved
"""Create a new scan for this table."""
from pyiceberg.table.scan import DataTableScan

if not (use_snapshot := snapshot_id or self.metadata.current_snapshot_id):
rdblue marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("Unable to resolve a snapshot to use for this scan.")

if not (snapshot := self.snapshot_by_id(use_snapshot)):
rdblue marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("Unable to resolve a snapshot to use for this scan.")

return DataTableScan(io, self, snapshot, expression)
103 changes: 103 additions & 0 deletions python/pyiceberg/table/scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import itertools
from abc import ABC, abstractmethod
from typing import Iterable, List, Optional

from pydantic import Field

from pyiceberg.expressions import AlwaysTrue, BooleanExpression
from pyiceberg.expressions.visitors import manifest_evaluator
from pyiceberg.io import FileIO
from pyiceberg.manifest import DataFile, ManifestFile
from pyiceberg.table import PartitionSpec, Snapshot, Table
from pyiceberg.utils.iceberg_base_model import IcebergBaseModel

ALWAYS_TRUE = AlwaysTrue()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not adding these constants everywhere. Just use AlwaysTrue() in place of this. It's a singleton.



class FileScanTask(IcebergBaseModel):
"""A scan task over a range of bytes in a single data file."""

manifest: ManifestFile = Field()
data_file: DataFile = Field()
_residual: BooleanExpression = Field()
spec: PartitionSpec = Field()
start: int = Field(default=0)

@property
def length(self) -> int:
return self.data_file.file_size_in_bytes


class TableScan(ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this API not match the Java API? Is there something more pythonic about this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to build the API incrementally and add things as and when needed, otherwise it becomes a bloated PR.

"""API for configuring a table scan."""

table: Table
snapshot: Snapshot
expression: BooleanExpression

def __init__(self, table: Table, snapshot: Optional[Snapshot] = None, expression: Optional[BooleanExpression] = ALWAYS_TRUE):
rdblue marked this conversation as resolved.
Show resolved Hide resolved
self.table = table
self.expression = expression or ALWAYS_TRUE
if resolved_snapshot := snapshot or table.current_snapshot():
self.snapshot = resolved_snapshot
else:
raise ValueError("Unable to resolve to a Snapshot to use for the table scan.")

@abstractmethod
def plan_files(self) -> Iterable[FileScanTask]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Java, there are several different types of tasks. I'd recommend using the same pattern and not requiring this to be FileScanTask. Introduce an abstract ScanTask and return an iterable of that instead.

"""Plan tasks for this scan where each task reads a single file.

Returns:
Table: a tuple of tasks scanning entire files required by this scan
"""


class DataTableScan(TableScan):
"""API for configuring a table scan."""

io: FileIO

def __init__(
self, io: FileIO, table: Table, snapshot: Optional[Snapshot] = None, expression: Optional[BooleanExpression] = ALWAYS_TRUE
):
self.io = io
super().__init__(table, snapshot, expression)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Convention is to first call the super, and then do the rest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make the change.


def plan_files(self) -> Iterable[FileScanTask]:
matching_manifests = [
manifest
for manifest in self.snapshot.fetch_manifest_list(self.io)
if manifest_evaluator(
self.table.specs()[manifest.partition_spec_id],
self.table.schemas()[self.snapshot.schema_id] if self.snapshot.schema_id is not None else self.table.schema(),
self.expression,
)(manifest)
]

return itertools.chain.from_iterable(
[self._fetch_file_scan_tasks_for_manifest(manifest) for manifest in matching_manifests]
)

def _fetch_file_scan_tasks_for_manifest(self, manifest: ManifestFile) -> List[FileScanTask]:
manifest_entries = manifest.fetch_manifest_entry(self.io)
data_files = [entry.data_file for entry in manifest_entries]

spec = self.table.specs().get(manifest.partition_spec_id)
# Row level filters to be implemented. Need projections for evaluating residual. Skipping for the time being.
return [FileScanTask(manifest=manifest, data_file=file, spec=spec, residual=None) for file in data_files]
4 changes: 2 additions & 2 deletions python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,8 +932,8 @@ def delete(self, location: Union[str, InputFile, OutputFile]):


@pytest.fixture(scope="session", autouse=True)
def LocalFileIOFixture():
return LocalFileIO
def local_file_io() -> LocalFileIO:
return LocalFileIO()


@pytest.fixture(scope="session")
Expand Down
28 changes: 27 additions & 1 deletion python/tests/expressions/test_visitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ def test_manifest_evaluator_equal_no_overlap():
assert not _create_manifest_evaluator(expr).eval(manifest)


def test_manifest_evaluator_equal_overlap():
def test_manifest_evaluator_equal_overlap_with_lower_bound():
expr = BoundEqualTo(
term=BoundReference(
field=NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
Expand All @@ -1043,6 +1043,32 @@ def test_manifest_evaluator_equal_overlap():
assert _create_manifest_evaluator(expr).eval(manifest)


def test_manifest_evaluator_equal_overlap_with_upper_bound():
expr = BoundEqualTo(
term=BoundReference(
field=NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
accessor=Accessor(position=0, inner=None),
),
literal=StringLiteral("b"),
)

manifest = ManifestFile(
manifest_path="",
manifest_length=0,
partition_spec_id=0,
partitions=[
PartitionFieldSummary(
contains_null=False,
contains_nan=False,
lower_bound=_to_byte_buffer(StringType(), "a"),
upper_bound=_to_byte_buffer(StringType(), "b"),
)
],
)

assert _create_manifest_evaluator(expr).eval(manifest)


def test_manifest_evaluator_equal_all_null():
expr = BoundEqualTo(
term=BoundReference(
Expand Down
Loading