diff --git a/python/pyiceberg/cli/console.py b/python/pyiceberg/cli/console.py index e783a4312fa5..3fe7f63135e4 100644 --- a/python/pyiceberg/cli/console.py +++ b/python/pyiceberg/cli/console.py @@ -361,3 +361,24 @@ def table(ctx: Context, identifier: str, property_name: str): # noqa: F811 ctx.exit(1) else: raise NoSuchPropertyException(f"Property {property_name} does not exist on {identifier}") + + +@run.group() +def scan(): + """Create a table scan.""" + + +@scan.command("table") +@click.argument("identifier") +@click.pass_context +@catch_exception() +def scan_table(ctx: Context, identifier: str): + """Lists all the data files that will be scanned as part of the table scan plan.""" + catalog, output = _catalog_and_output(ctx) + + catalog_table = catalog.load_table(identifier) + io = load_file_io({**catalog.properties, **catalog_table.metadata.properties}) + file_scan_tasks = catalog_table.new_scan(io).plan_files() + snapshot = catalog_table.current_snapshot() + snapshot_id = snapshot.snapshot_id if snapshot is not None else None + output.scan_plan_files(file_scan_tasks, snapshot_id) diff --git a/python/pyiceberg/cli/output.py b/python/pyiceberg/cli/output.py index c7907c7e5ac4..7933c955cf24 100644 --- a/python/pyiceberg/cli/output.py +++ b/python/pyiceberg/cli/output.py @@ -14,9 +14,11 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from __future__ import annotations + import json from abc import ABC, abstractmethod -from typing import Any, List, Optional +from typing import Any, Iterable from uuid import UUID from rich.console import Console @@ -27,6 +29,7 @@ from pyiceberg.schema import Schema from pyiceberg.table import Table from pyiceberg.table.partitioning import PartitionSpec +from pyiceberg.table.scan import FileScanTask from pyiceberg.typedef import Identifier, Properties @@ -38,7 +41,7 @@ def exception(self, ex: Exception) -> None: ... @abstractmethod - def identifiers(self, identifiers: List[Identifier]) -> None: + def identifiers(self, identifiers: list[Identifier]) -> None: ... @abstractmethod @@ -49,6 +52,10 @@ def describe_table(self, table: Table) -> None: def files(self, table: Table, io: FileIO, history: bool) -> None: ... + @abstractmethod + def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: int | None = None) -> None: + ... + @abstractmethod def describe_properties(self, properties: Properties) -> None: ... @@ -66,7 +73,7 @@ def spec(self, spec: PartitionSpec) -> None: ... @abstractmethod - def uuid(self, uuid: Optional[UUID]) -> None: + def uuid(self, uuid: UUID | None) -> None: ... @@ -88,7 +95,7 @@ def exception(self, ex: Exception) -> None: else: Console(stderr=True).print(ex) - def identifiers(self, identifiers: List[Identifier]) -> None: + def identifiers(self, identifiers: list[Identifier]) -> None: table = self._table for identifier in identifiers: table.add_row(".".join(identifier)) @@ -146,6 +153,19 @@ def files(self, table: Table, io: FileIO, history: bool) -> None: manifest_tree.add(f"Datafile: {manifest_entry.data_file.file_path}") Console().print(snapshot_tree) + def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: int | None = None) -> None: + snapshot_tree = Tree(f"Snapshot: {snapshot_id}") + + manifest_dict = {} + for file in plan_files: + if file.manifest.manifest_path not in manifest_dict: + manifest_tree = snapshot_tree.add(f"Manifest: {file.manifest.manifest_path}") + manifest_dict[file.manifest.manifest_path] = manifest_tree + + manifest_dict[file.manifest.manifest_path].add(f"Data File: {file.data_file.file_path}") + + Console().print(snapshot_tree) + def describe_properties(self, properties: Properties) -> None: output_table = self._table for k, v in properties.items(): @@ -164,7 +184,7 @@ def schema(self, schema: Schema) -> None: def spec(self, spec: PartitionSpec) -> None: Console().print(str(spec)) - def uuid(self, uuid: Optional[UUID]) -> None: + def uuid(self, uuid: UUID | None) -> None: Console().print(str(uuid) if uuid else "missing") @@ -182,7 +202,7 @@ def _out(self, d: Any) -> None: def exception(self, ex: Exception) -> None: self._out({"type": ex.__class__.__name__, "message": str(ex)}) - def identifiers(self, identifiers: List[Identifier]) -> None: + def identifiers(self, identifiers: list[Identifier]) -> None: self._out([".".join(identifier) for identifier in identifiers]) def describe_table(self, table: Table) -> None: @@ -200,8 +220,11 @@ def schema(self, schema: Schema) -> None: def files(self, table: Table, io: FileIO, history: bool) -> None: pass + def scan_plan_files(self, plan_files: Iterable[FileScanTask], snapshot_id: int | None = None) -> None: + pass + def spec(self, spec: PartitionSpec) -> None: print(spec.json()) - def uuid(self, uuid: Optional[UUID]) -> None: + def uuid(self, uuid: UUID | None) -> None: self._out({"uuid": str(uuid) if uuid else "missing"}) diff --git a/python/pyiceberg/expressions/visitors.py b/python/pyiceberg/expressions/visitors.py index 48534903150e..fe209bf36e40 100644 --- a/python/pyiceberg/expressions/visitors.py +++ b/python/pyiceberg/expressions/visitors.py @@ -613,6 +613,8 @@ def visit_or(self, left_result: bool, right_result: bool) -> bool: def manifest_evaluator( partition_spec: PartitionSpec, schema: Schema, partition_filter: BooleanExpression, case_sensitive: bool = True ) -> Callable[[ManifestFile], bool]: - partition_schema = Schema(*partition_spec.partition_type(schema)) + partition_type = partition_spec.partition_type(schema) + partition_schema = Schema(*partition_type.fields) + partition_filter = partition_filter if partition_filter is not None else AlwaysTrue() evaluator = _ManifestEvalVisitor(partition_schema, partition_filter, case_sensitive) return evaluator.eval diff --git a/python/pyiceberg/table/__init__.py b/python/pyiceberg/table/__init__.py index 73a2bcc9b514..fc2f1917e4f2 100644 --- a/python/pyiceberg/table/__init__.py +++ b/python/pyiceberg/table/__init__.py @@ -14,12 +14,12 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - - -from typing import Dict, List, Optional +from __future__ import annotations from pydantic import Field +from pyiceberg.expressions import BooleanExpression +from pyiceberg.io import FileIO from pyiceberg.schema import Schema from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.partitioning import PartitionSpec @@ -42,7 +42,7 @@ def schema(self) -> Schema: """Return the schema for this table""" return next(schema for schema in self.metadata.schemas if schema.schema_id == self.metadata.current_schema_id) - def schemas(self) -> Dict[int, Schema]: + def schemas(self) -> dict[int, Schema]: """Return a dict of the schema of this table""" return {schema.schema_id: schema for schema in self.metadata.schemas} @@ -50,7 +50,7 @@ def spec(self) -> PartitionSpec: """Return the partition spec of this table""" return next(spec for spec in self.metadata.partition_specs if spec.spec_id == self.metadata.default_spec_id) - def specs(self) -> Dict[int, PartitionSpec]: + def specs(self) -> dict[int, PartitionSpec]: """Return a dict the partition specs this table""" return {spec.spec_id: spec for spec in self.metadata.partition_specs} @@ -60,7 +60,7 @@ def sort_order(self) -> SortOrder: sort_order for sort_order in self.metadata.sort_orders if sort_order.order_id == self.metadata.default_sort_order_id ) - def sort_orders(self) -> Dict[int, SortOrder]: + def sort_orders(self) -> dict[int, SortOrder]: """Return a dict of the sort orders of this table""" return {sort_order.order_id: sort_order for sort_order in self.metadata.sort_orders} @@ -68,25 +68,32 @@ def location(self) -> str: """Return the table's base location.""" return self.metadata.location - def current_snapshot(self) -> Optional[Snapshot]: + def current_snapshot(self) -> Snapshot | None: """Get the current snapshot for this table, or None if there is no current snapshot.""" if snapshot_id := self.metadata.current_snapshot_id: return self.snapshot_by_id(snapshot_id) return None - def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]: + def snapshot_by_id(self, snapshot_id: int) -> Snapshot | None: """Get the snapshot of this table with the given id, or None if there is no matching snapshot.""" try: return next(snapshot for snapshot in self.metadata.snapshots if snapshot.snapshot_id == snapshot_id) except StopIteration: return None - def snapshot_by_name(self, name: str) -> Optional[Snapshot]: + def snapshot_by_name(self, name: str) -> Snapshot | None: """Returns the snapshot referenced by the given name or null if no such reference exists.""" if ref := self.metadata.refs.get(name): return self.snapshot_by_id(ref.snapshot_id) return None - def history(self) -> List[SnapshotLogEntry]: + def history(self) -> list[SnapshotLogEntry]: """Get the snapshot history of this table.""" return self.metadata.snapshot_log + + def new_scan(self, io: FileIO, snapshot_id: int | None = None, expression: BooleanExpression | None = None): + """Create a new scan for this table.""" + from pyiceberg.table.scan import DataTableScan + + snapshot = self.snapshot_by_id(snapshot_id) if snapshot_id is not None else None + return DataTableScan(io, self, snapshot, expression) diff --git a/python/pyiceberg/table/scan.py b/python/pyiceberg/table/scan.py new file mode 100644 index 000000000000..268b13e1c198 --- /dev/null +++ b/python/pyiceberg/table/scan.py @@ -0,0 +1,115 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import itertools +from abc import ABC, abstractmethod +from typing import Iterable + +from pyiceberg.expressions import AlwaysTrue, BooleanExpression +from pyiceberg.expressions.visitors import manifest_evaluator +from pyiceberg.io import FileIO +from pyiceberg.manifest import DataFile, ManifestFile +from pyiceberg.table import PartitionSpec, Snapshot, Table + + +class FileScanTask(ABC): + """A scan task over a range of bytes in a single data file.""" + + manifest: ManifestFile + data_file: DataFile + residual: BooleanExpression | None + spec: PartitionSpec | None + + def __init__( + self, + manifest: ManifestFile, + data_file: DataFile, + spec: PartitionSpec | None = None, + residual: BooleanExpression | None = None, + ): + self.manifest = manifest + self.data_file = data_file + self.spec = spec + self.residual = residual + + @property + def start(self) -> int: + return 0 + + @property + def length(self) -> int: + return self.data_file.file_size_in_bytes + + +class TableScan(ABC): + """API for configuring a table scan.""" + + table: Table + snapshot: Snapshot + expression: BooleanExpression + + def __init__(self, table: Table, snapshot: Snapshot | None = None, expression: BooleanExpression | None = None): + self.table = table + self.expression = AlwaysTrue() if expression is None else expression + if snapshot is None: + snapshot = table.current_snapshot() + if snapshot is not None: + self.snapshot = snapshot + else: + raise ValueError("Unable to resolve to a Snapshot to use for the table scan.") + + @abstractmethod + def plan_files(self) -> Iterable[FileScanTask]: + """Plan tasks for this scan where each task reads a single file. + + Returns: + Table: a tuple of tasks scanning entire files required by this scan + """ + + +class DataTableScan(TableScan): + """API for configuring a table scan.""" + + io: FileIO + + def __init__(self, io: FileIO, table: Table, snapshot: Snapshot | None = None, expression: BooleanExpression | None = None): + self.io = io + super().__init__(table, snapshot, expression) + + def plan_files(self) -> Iterable[FileScanTask]: + matching_manifests = [ + manifest + for manifest in self.snapshot.fetch_manifest_list(self.io) + if manifest_evaluator( + self.table.specs()[manifest.partition_spec_id], + self.table.schemas()[self.snapshot.schema_id] if self.snapshot.schema_id is not None else self.table.schema(), + self.expression, + ) + ] + + return itertools.chain.from_iterable( + [self._fetch_file_scan_tasks_for_manifest(manifest) for manifest in matching_manifests] + ) + + def _fetch_file_scan_tasks_for_manifest(self, manifest: ManifestFile) -> list[FileScanTask]: + manifest_entries = manifest.fetch_manifest_entry(self.io) + data_files = [entry.data_file for entry in manifest_entries] + + spec = self.table.specs().get(manifest.partition_spec_id) + # Row level filters to be implemented. Need projections for evaluating residual. Skipping for the time being. + return [FileScanTask(manifest, file, spec, residual=None) for file in data_files]