Skip to content

Commit

Permalink
Python: TableScan Plan files API implementation without row-level fil…
Browse files Browse the repository at this point in the history
…ters evaluation. (apache#3229)
  • Loading branch information
dhruv-pratap committed Oct 27, 2022
1 parent 8271791 commit 7e30879
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 18 deletions.
21 changes: 21 additions & 0 deletions python/pyiceberg/cli/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,3 +361,24 @@ def table(ctx: Context, identifier: str, property_name: str): # noqa: F811
ctx.exit(1)
else:
raise NoSuchPropertyException(f"Property {property_name} does not exist on {identifier}")


@run.group()
def scan():
"""Create a table scan."""


@scan.command("table")
@click.argument("identifier")
@click.pass_context
@catch_exception()
def scan_table(ctx: Context, identifier: str):
"""Lists all the data files that will be scanned as part of the table scan plan."""
catalog, output = _catalog_and_output(ctx)

catalog_table = catalog.load_table(identifier)
io = load_file_io({**catalog.properties, **catalog_table.metadata.properties})
file_scan_tasks = catalog_table.new_scan(io).plan_files()
snapshot = catalog_table.current_snapshot()
snapshot_id = snapshot.snapshot_id if snapshot is not None else None
output.scan_plan_files(file_scan_tasks, snapshot_id)
37 changes: 30 additions & 7 deletions python/pyiceberg/cli/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import json
from abc import ABC, abstractmethod
from typing import Any, List, Optional
from typing import Any, Iterable
from uuid import UUID

from rich.console import Console
Expand All @@ -27,6 +29,7 @@
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table.partitioning import PartitionSpec
from pyiceberg.table.scan import FileScanTask
from pyiceberg.typedef import Identifier, Properties


Expand All @@ -38,7 +41,7 @@ def exception(self, ex: Exception) -> None:
...

@abstractmethod
def identifiers(self, identifiers: List[Identifier]) -> None:
def identifiers(self, identifiers: list[Identifier]) -> None:
...

@abstractmethod
Expand All @@ -49,6 +52,10 @@ def describe_table(self, table: Table) -> None:
def files(self, table: Table, io: FileIO, history: bool) -> None:
...

@abstractmethod
def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: int | None = None) -> None:
...

@abstractmethod
def describe_properties(self, properties: Properties) -> None:
...
Expand All @@ -66,7 +73,7 @@ def spec(self, spec: PartitionSpec) -> None:
...

@abstractmethod
def uuid(self, uuid: Optional[UUID]) -> None:
def uuid(self, uuid: UUID | None) -> None:
...


Expand All @@ -88,7 +95,7 @@ def exception(self, ex: Exception) -> None:
else:
Console(stderr=True).print(ex)

def identifiers(self, identifiers: List[Identifier]) -> None:
def identifiers(self, identifiers: list[Identifier]) -> None:
table = self._table
for identifier in identifiers:
table.add_row(".".join(identifier))
Expand Down Expand Up @@ -146,6 +153,19 @@ def files(self, table: Table, io: FileIO, history: bool) -> None:
manifest_tree.add(f"Datafile: {manifest_entry.data_file.file_path}")
Console().print(snapshot_tree)

def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: int | None = None) -> None:
snapshot_tree = Tree(f"Snapshot: {snapshot_id}")

manifest_dict = {}
for file in plan_files:
if file.manifest.manifest_path not in manifest_dict:
manifest_tree = snapshot_tree.add(f"Manifest: {file.manifest.manifest_path}")
manifest_dict[file.manifest.manifest_path] = manifest_tree

manifest_dict[file.manifest.manifest_path].add(f"Data File: {file.data_file.file_path}")

Console().print(snapshot_tree)

def describe_properties(self, properties: Properties) -> None:
output_table = self._table
for k, v in properties.items():
Expand All @@ -164,7 +184,7 @@ def schema(self, schema: Schema) -> None:
def spec(self, spec: PartitionSpec) -> None:
Console().print(str(spec))

def uuid(self, uuid: Optional[UUID]) -> None:
def uuid(self, uuid: UUID | None) -> None:
Console().print(str(uuid) if uuid else "missing")


Expand All @@ -182,7 +202,7 @@ def _out(self, d: Any) -> None:
def exception(self, ex: Exception) -> None:
self._out({"type": ex.__class__.__name__, "message": str(ex)})

def identifiers(self, identifiers: List[Identifier]) -> None:
def identifiers(self, identifiers: list[Identifier]) -> None:
self._out([".".join(identifier) for identifier in identifiers])

def describe_table(self, table: Table) -> None:
Expand All @@ -200,8 +220,11 @@ def schema(self, schema: Schema) -> None:
def files(self, table: Table, io: FileIO, history: bool) -> None:
pass

def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: int | None = None) -> None:
pass

def spec(self, spec: PartitionSpec) -> None:
print(spec.json())

def uuid(self, uuid: Optional[UUID]) -> None:
def uuid(self, uuid: UUID | None) -> None:
self._out({"uuid": str(uuid) if uuid else "missing"})
4 changes: 3 additions & 1 deletion python/pyiceberg/expressions/visitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,8 @@ def visit_or(self, left_result: bool, right_result: bool) -> bool:
def manifest_evaluator(
partition_spec: PartitionSpec, schema: Schema, partition_filter: BooleanExpression, case_sensitive: bool = True
) -> Callable[[ManifestFile], bool]:
partition_schema = Schema(*partition_spec.partition_type(schema))
partition_type = partition_spec.partition_type(schema)
partition_schema = Schema(*partition_type.fields)
partition_filter = partition_filter if partition_filter is not None else AlwaysTrue()
evaluator = _ManifestEvalVisitor(partition_schema, partition_filter, case_sensitive)
return evaluator.eval
27 changes: 17 additions & 10 deletions python/pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


from typing import Dict, List, Optional
from __future__ import annotations

from pydantic import Field

from pyiceberg.expressions import BooleanExpression
from pyiceberg.io import FileIO
from pyiceberg.schema import Schema
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.partitioning import PartitionSpec
Expand All @@ -42,15 +42,15 @@ def schema(self) -> Schema:
"""Return the schema for this table"""
return next(schema for schema in self.metadata.schemas if schema.schema_id == self.metadata.current_schema_id)

def schemas(self) -> Dict[int, Schema]:
def schemas(self) -> dict[int, Schema]:
"""Return a dict of the schema of this table"""
return {schema.schema_id: schema for schema in self.metadata.schemas}

def spec(self) -> PartitionSpec:
"""Return the partition spec of this table"""
return next(spec for spec in self.metadata.partition_specs if spec.spec_id == self.metadata.default_spec_id)

def specs(self) -> Dict[int, PartitionSpec]:
def specs(self) -> dict[int, PartitionSpec]:
"""Return a dict the partition specs this table"""
return {spec.spec_id: spec for spec in self.metadata.partition_specs}

Expand All @@ -60,33 +60,40 @@ def sort_order(self) -> SortOrder:
sort_order for sort_order in self.metadata.sort_orders if sort_order.order_id == self.metadata.default_sort_order_id
)

def sort_orders(self) -> Dict[int, SortOrder]:
def sort_orders(self) -> dict[int, SortOrder]:
"""Return a dict of the sort orders of this table"""
return {sort_order.order_id: sort_order for sort_order in self.metadata.sort_orders}

def location(self) -> str:
"""Return the table's base location."""
return self.metadata.location

def current_snapshot(self) -> Optional[Snapshot]:
def current_snapshot(self) -> Snapshot | None:
"""Get the current snapshot for this table, or None if there is no current snapshot."""
if snapshot_id := self.metadata.current_snapshot_id:
return self.snapshot_by_id(snapshot_id)
return None

def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
def snapshot_by_id(self, snapshot_id: int) -> Snapshot | None:
"""Get the snapshot of this table with the given id, or None if there is no matching snapshot."""
try:
return next(snapshot for snapshot in self.metadata.snapshots if snapshot.snapshot_id == snapshot_id)
except StopIteration:
return None

def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
def snapshot_by_name(self, name: str) -> Snapshot | None:
"""Returns the snapshot referenced by the given name or null if no such reference exists."""
if ref := self.metadata.refs.get(name):
return self.snapshot_by_id(ref.snapshot_id)
return None

def history(self) -> List[SnapshotLogEntry]:
def history(self) -> list[SnapshotLogEntry]:
"""Get the snapshot history of this table."""
return self.metadata.snapshot_log

def new_scan(self, io: FileIO, snapshot_id: int | None = None, expression: BooleanExpression | None = None):
"""Create a new scan for this table."""
from pyiceberg.table.scan import DataTableScan

snapshot = self.snapshot_by_id(snapshot_id) if snapshot_id is not None else None
return DataTableScan(io, self, snapshot, expression)
115 changes: 115 additions & 0 deletions python/pyiceberg/table/scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import itertools
from abc import ABC, abstractmethod
from typing import Iterable

from pyiceberg.expressions import AlwaysTrue, BooleanExpression
from pyiceberg.expressions.visitors import manifest_evaluator
from pyiceberg.io import FileIO
from pyiceberg.manifest import DataFile, ManifestFile
from pyiceberg.table import PartitionSpec, Snapshot, Table


class FileScanTask(ABC):
"""A scan task over a range of bytes in a single data file."""

manifest: ManifestFile
data_file: DataFile
residual: BooleanExpression | None
spec: PartitionSpec | None

def __init__(
self,
manifest: ManifestFile,
data_file: DataFile,
spec: PartitionSpec | None = None,
residual: BooleanExpression | None = None,
):
self.manifest = manifest
self.data_file = data_file
self.spec = spec
self.residual = residual

@property
def start(self) -> int:
return 0

@property
def length(self) -> int:
return self.data_file.file_size_in_bytes


class TableScan(ABC):
"""API for configuring a table scan."""

table: Table
snapshot: Snapshot
expression: BooleanExpression

def __init__(self, table: Table, snapshot: Snapshot | None = None, expression: BooleanExpression | None = None):
self.table = table
self.expression = AlwaysTrue() if expression is None else expression
if snapshot is None:
snapshot = table.current_snapshot()
if snapshot is not None:
self.snapshot = snapshot
else:
raise ValueError("Unable to resolve to a Snapshot to use for the table scan.")

@abstractmethod
def plan_files(self) -> Iterable[FileScanTask]:
"""Plan tasks for this scan where each task reads a single file.
Returns:
Table: a tuple of tasks scanning entire files required by this scan
"""


class DataTableScan(TableScan):
"""API for configuring a table scan."""

io: FileIO

def __init__(self, io: FileIO, table: Table, snapshot: Snapshot | None = None, expression: BooleanExpression | None = None):
self.io = io
super().__init__(table, snapshot, expression)

def plan_files(self) -> Iterable[FileScanTask]:
matching_manifests = [
manifest
for manifest in self.snapshot.fetch_manifest_list(self.io)
if manifest_evaluator(
self.table.specs()[manifest.partition_spec_id],
self.table.schemas()[self.snapshot.schema_id] if self.snapshot.schema_id is not None else self.table.schema(),
self.expression,
)
]

return itertools.chain.from_iterable(
[self._fetch_file_scan_tasks_for_manifest(manifest) for manifest in matching_manifests]
)

def _fetch_file_scan_tasks_for_manifest(self, manifest: ManifestFile) -> list[FileScanTask]:
manifest_entries = manifest.fetch_manifest_entry(self.io)
data_files = [entry.data_file for entry in manifest_entries]

spec = self.table.specs().get(manifest.partition_spec_id)
# Row level filters to be implemented. Need projections for evaluating residual. Skipping for the time being.
return [FileScanTask(manifest, file, spec, residual=None) for file in data_files]

0 comments on commit 7e30879

Please sign in to comment.