Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: TableScan Plan files API implementation without residual evaluation #6069

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions python/pyiceberg/cli/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,3 +361,24 @@ def table(ctx: Context, identifier: str, property_name: str): # noqa: F811
ctx.exit(1)
else:
raise NoSuchPropertyException(f"Property {property_name} does not exist on {identifier}")


@run.group()
def scan():
"""Create a table scan."""


@scan.command("table")
@click.argument("identifier")
@click.pass_context
@catch_exception()
def scan_table(ctx: Context, identifier: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that we can easily visualize the files. I think we should merge this one with the files command since they are so similar. WDYT?

➜  python git:(master) ✗ pyiceberg --verbose true --catalog fokko files hive_test.h1
Snapshots: fokko.hive_test.h1
└── Snapshot 4362649644256929116, schema 0: 
    s3://bucket/41246f47-e1bc-465e-89b4-6d039a9d55dc/67ad77ac-d90b-4472-b1e9-672d91a1ac41/metadata/snap-4362649644256929116-1-63e88d4d-1b30-4cae-b7a0-f53b0b54758a.avro
    ├── Manifest: s3://bucket/41246f47-e1bc-465e-89b4-6d039a9d55dc/67ad77ac-d90b-4472-b1e9-672d91a1ac41/metadata/63e88d4d-1b30-4cae-b7a0-f53b0b54758a-m0.avro
    │   └── Datafile: 
    │       s3://bucket/41246f47-e1bc-465e-89b4-6d039a9d55dc/67ad77ac-d90b-4472-b1e9-672d91a1ac41/data/0bfca5ff//00000-0-dweeks_20220901095219_f181f97a-43ce-4aa6-8cec-6956d83b7a42-job_lo
    │       cal2002913683_0002-00001.parquet
    └── Manifest: s3://bucket/41246f47-e1bc-465e-89b4-6d039a9d55dc/67ad77ac-d90b-4472-b1e9-672d91a1ac41/metadata/508fd9ac-520f-47d8-900f-00f1dda051ac-m0.avro
        └── Datafile: s3://bucket/41246f47-e1bc-465e-89b4-6d039a9d55dc/67ad77ac-d90b-4472-b1e9-672d91a1ac41/data/3d030bcf/00000-0-44897130-c494-4dca-a164-e874c1e37c36-00001.parquet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow you @Fokko . Do you mean make scan a sub-command of files command? Or just have files command use the table.new_scan().plan_files() API underneath?

If it's the later, I like that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the later one is more flexible, but then we should remove the files because they have so much overlap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only difference being files at present lists all snapshots. The table scan plan_files() will only list the snapshot that is being used for the scan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can make files with three options:

  • files --snapshot=all - The current files behavior
  • files --snapshot=current - Files under the current snapshot
  • files --snapshot=<snapshot_id> - Files under the snapshot_id specified

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not move it to the scan subcommand? This way we don't have similar commands

"""Lists all the data files that will be scanned as part of the table scan plan."""
catalog, output = _catalog_and_output(ctx)

catalog_table = catalog.load_table(identifier)
io = load_file_io({**catalog.properties, **catalog_table.metadata.properties})
file_scan_tasks = catalog_table.new_scan(io).plan_files()
snapshot = catalog_table.current_snapshot()
snapshot_id = snapshot.snapshot_id if snapshot is not None else None
output.scan_plan_files(file_scan_tasks, snapshot_id)
37 changes: 30 additions & 7 deletions python/pyiceberg/cli/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a community, we probably should make a decision on this one. Sometimes we use it, but not all the time. Personally I find Optional[str] easier to read than str | None

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like Optional[str] too, but I think for some reason when I run make lint locally it turns it into the later format i.e. str | None Do we need to tweak the linter config to enforce one format over the other?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because the import statement. PyUpgrade will automatically rewrite the imports: https://github.com/asottile/pyupgrade#pep-604-typing-rewrites We can change this in another PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reverted to not use from __future__ import annotations in a recent commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should have been fixed now #6114 is in. Delayed loading of the annotations (that's what the import does), can be helpful sometime :)


import json
from abc import ABC, abstractmethod
from typing import Any, List, Optional
from typing import Any, Iterable
from uuid import UUID

from rich.console import Console
Expand All @@ -27,6 +29,7 @@
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table.partitioning import PartitionSpec
from pyiceberg.table.scan import FileScanTask
from pyiceberg.typedef import Identifier, Properties


Expand All @@ -38,7 +41,7 @@ def exception(self, ex: Exception) -> None:
...

@abstractmethod
def identifiers(self, identifiers: List[Identifier]) -> None:
def identifiers(self, identifiers: list[Identifier]) -> None:
...

@abstractmethod
Expand All @@ -49,6 +52,10 @@ def describe_table(self, table: Table) -> None:
def files(self, table: Table, io: FileIO, history: bool) -> None:
...

@abstractmethod
def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: int | None = None) -> None:
...

@abstractmethod
def describe_properties(self, properties: Properties) -> None:
...
Expand All @@ -66,7 +73,7 @@ def spec(self, spec: PartitionSpec) -> None:
...

@abstractmethod
def uuid(self, uuid: Optional[UUID]) -> None:
def uuid(self, uuid: UUID | None) -> None:
...


Expand All @@ -88,7 +95,7 @@ def exception(self, ex: Exception) -> None:
else:
Console(stderr=True).print(ex)

def identifiers(self, identifiers: List[Identifier]) -> None:
def identifiers(self, identifiers: list[Identifier]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def identifiers(self, identifiers: list[Identifier]) -> None:
def identifiers(self, identifiers: List[Identifier]) -> None:

table = self._table
for identifier in identifiers:
table.add_row(".".join(identifier))
Expand Down Expand Up @@ -146,6 +153,19 @@ def files(self, table: Table, io: FileIO, history: bool) -> None:
manifest_tree.add(f"Datafile: {manifest_entry.data_file.file_path}")
Console().print(snapshot_tree)

def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: int | None = None) -> None:
snapshot_tree = Tree(f"Snapshot: {snapshot_id}")

manifest_dict = {}
for file in plan_files:
if file.manifest.manifest_path not in manifest_dict:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Should we move file.manifest.manifest_path in a local variable, since we use it three times?

manifest_tree = snapshot_tree.add(f"Manifest: {file.manifest.manifest_path}")
manifest_dict[file.manifest.manifest_path] = manifest_tree

manifest_dict[file.manifest.manifest_path].add(f"Data File: {file.data_file.file_path}")

Console().print(snapshot_tree)

def describe_properties(self, properties: Properties) -> None:
output_table = self._table
for k, v in properties.items():
Expand All @@ -164,7 +184,7 @@ def schema(self, schema: Schema) -> None:
def spec(self, spec: PartitionSpec) -> None:
Console().print(str(spec))

def uuid(self, uuid: Optional[UUID]) -> None:
def uuid(self, uuid: UUID | None) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def uuid(self, uuid: UUID | None) -> None:
def uuid(self, uuid: Optional[UUID]) -> None:

Console().print(str(uuid) if uuid else "missing")


Expand All @@ -182,7 +202,7 @@ def _out(self, d: Any) -> None:
def exception(self, ex: Exception) -> None:
self._out({"type": ex.__class__.__name__, "message": str(ex)})

def identifiers(self, identifiers: List[Identifier]) -> None:
def identifiers(self, identifiers: list[Identifier]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def identifiers(self, identifiers: list[Identifier]) -> None:
def identifiers(self, identifiers: List[Identifier]) -> None:

self._out([".".join(identifier) for identifier in identifiers])

def describe_table(self, table: Table) -> None:
Expand All @@ -200,8 +220,11 @@ def schema(self, schema: Schema) -> None:
def files(self, table: Table, io: FileIO, history: bool) -> None:
pass

def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: int | None = None) -> None:
pass

def spec(self, spec: PartitionSpec) -> None:
print(spec.json())

def uuid(self, uuid: Optional[UUID]) -> None:
def uuid(self, uuid: UUID | None) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def uuid(self, uuid: UUID | None) -> None:
def uuid(self, uuid: Optional[UUID]) -> None:

self._out({"uuid": str(uuid) if uuid else "missing"})
4 changes: 3 additions & 1 deletion python/pyiceberg/expressions/visitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,8 @@ def visit_or(self, left_result: bool, right_result: bool) -> bool:
def manifest_evaluator(
partition_spec: PartitionSpec, schema: Schema, partition_filter: BooleanExpression, case_sensitive: bool = True
) -> Callable[[ManifestFile], bool]:
partition_schema = Schema(*partition_spec.partition_type(schema))
partition_type = partition_spec.partition_type(schema)
partition_schema = Schema(*partition_type.fields)
partition_filter = partition_filter if partition_filter is not None else AlwaysTrue()
evaluator = _ManifestEvalVisitor(partition_schema, partition_filter, case_sensitive)
return evaluator.eval
27 changes: 17 additions & 10 deletions python/pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


from typing import Dict, List, Optional
from __future__ import annotations

from pydantic import Field

from pyiceberg.expressions import BooleanExpression
from pyiceberg.io import FileIO
from pyiceberg.schema import Schema
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.partitioning import PartitionSpec
Expand All @@ -42,15 +42,15 @@ def schema(self) -> Schema:
"""Return the schema for this table"""
return next(schema for schema in self.metadata.schemas if schema.schema_id == self.metadata.current_schema_id)

def schemas(self) -> Dict[int, Schema]:
def schemas(self) -> dict[int, Schema]:
"""Return a dict of the schema of this table"""
return {schema.schema_id: schema for schema in self.metadata.schemas}

def spec(self) -> PartitionSpec:
"""Return the partition spec of this table"""
return next(spec for spec in self.metadata.partition_specs if spec.spec_id == self.metadata.default_spec_id)

def specs(self) -> Dict[int, PartitionSpec]:
def specs(self) -> dict[int, PartitionSpec]:
"""Return a dict the partition specs this table"""
return {spec.spec_id: spec for spec in self.metadata.partition_specs}

Expand All @@ -60,33 +60,40 @@ def sort_order(self) -> SortOrder:
sort_order for sort_order in self.metadata.sort_orders if sort_order.order_id == self.metadata.default_sort_order_id
)

def sort_orders(self) -> Dict[int, SortOrder]:
def sort_orders(self) -> dict[int, SortOrder]:
"""Return a dict of the sort orders of this table"""
return {sort_order.order_id: sort_order for sort_order in self.metadata.sort_orders}

def location(self) -> str:
"""Return the table's base location."""
return self.metadata.location

def current_snapshot(self) -> Optional[Snapshot]:
def current_snapshot(self) -> Snapshot | None:
"""Get the current snapshot for this table, or None if there is no current snapshot."""
if snapshot_id := self.metadata.current_snapshot_id:
return self.snapshot_by_id(snapshot_id)
return None

def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
def snapshot_by_id(self, snapshot_id: int) -> Snapshot | None:
"""Get the snapshot of this table with the given id, or None if there is no matching snapshot."""
try:
return next(snapshot for snapshot in self.metadata.snapshots if snapshot.snapshot_id == snapshot_id)
except StopIteration:
return None

def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
def snapshot_by_name(self, name: str) -> Snapshot | None:
"""Returns the snapshot referenced by the given name or null if no such reference exists."""
if ref := self.metadata.refs.get(name):
return self.snapshot_by_id(ref.snapshot_id)
return None

def history(self) -> List[SnapshotLogEntry]:
def history(self) -> list[SnapshotLogEntry]:
"""Get the snapshot history of this table."""
return self.metadata.snapshot_log

def new_scan(self, io: FileIO, snapshot_id: int | None = None, expression: BooleanExpression | None = None):
"""Create a new scan for this table."""
from pyiceberg.table.scan import DataTableScan

snapshot = self.snapshot_by_id(snapshot_id) if snapshot_id is not None else None
return DataTableScan(io, self, snapshot, expression)
115 changes: 115 additions & 0 deletions python/pyiceberg/table/scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import itertools
from abc import ABC, abstractmethod
from typing import Iterable

from pyiceberg.expressions import AlwaysTrue, BooleanExpression
from pyiceberg.expressions.visitors import manifest_evaluator
from pyiceberg.io import FileIO
from pyiceberg.manifest import DataFile, ManifestFile
from pyiceberg.table import PartitionSpec, Snapshot, Table


class FileScanTask(ABC):
"""A scan task over a range of bytes in a single data file."""

manifest: ManifestFile
data_file: DataFile
residual: BooleanExpression | None
spec: PartitionSpec | None

def __init__(
self,
manifest: ManifestFile,
data_file: DataFile,
spec: PartitionSpec | None = None,
residual: BooleanExpression | None = None,
):
self.manifest = manifest
self.data_file = data_file
self.spec = spec
self.residual = residual

@property
def start(self) -> int:
return 0

@property
def length(self) -> int:
return self.data_file.file_size_in_bytes


class TableScan(ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this API not match the Java API? Is there something more pythonic about this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to build the API incrementally and add things as and when needed, otherwise it becomes a bloated PR.

"""API for configuring a table scan."""

table: Table
snapshot: Snapshot
expression: BooleanExpression

def __init__(self, table: Table, snapshot: Snapshot | None = None, expression: BooleanExpression | None = None):
self.table = table
self.expression = AlwaysTrue() if expression is None else expression
if snapshot is None:
snapshot = table.current_snapshot()
if snapshot is not None:
self.snapshot = snapshot
else:
raise ValueError("Unable to resolve to a Snapshot to use for the table scan.")

@abstractmethod
def plan_files(self) -> Iterable[FileScanTask]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Java, there are several different types of tasks. I'd recommend using the same pattern and not requiring this to be FileScanTask. Introduce an abstract ScanTask and return an iterable of that instead.

"""Plan tasks for this scan where each task reads a single file.

Returns:
Table: a tuple of tasks scanning entire files required by this scan
"""


class DataTableScan(TableScan):
"""API for configuring a table scan."""

io: FileIO

def __init__(self, io: FileIO, table: Table, snapshot: Snapshot | None = None, expression: BooleanExpression | None = None):
self.io = io
super().__init__(table, snapshot, expression)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Convention is to first call the super, and then do the rest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make the change.


def plan_files(self) -> Iterable[FileScanTask]:
matching_manifests = [
manifest
for manifest in self.snapshot.fetch_manifest_list(self.io)
if manifest_evaluator(
self.table.specs()[manifest.partition_spec_id],
self.table.schemas()[self.snapshot.schema_id] if self.snapshot.schema_id is not None else self.table.schema(),
self.expression,
)(manifest)
]

return itertools.chain.from_iterable(
[self._fetch_file_scan_tasks_for_manifest(manifest) for manifest in matching_manifests]
)

def _fetch_file_scan_tasks_for_manifest(self, manifest: ManifestFile) -> list[FileScanTask]:
manifest_entries = manifest.fetch_manifest_entry(self.io)
data_files = [entry.data_file for entry in manifest_entries]

spec = self.table.specs().get(manifest.partition_spec_id)
# Row level filters to be implemented. Need projections for evaluating residual. Skipping for the time being.
return [FileScanTask(manifest, file, spec, residual=None) for file in data_files]