Skip to content

Commit

Permalink
Python: Alter table plumbing and REST support (#6323)
Browse files Browse the repository at this point in the history
* Alter table

* Make the CI happy

* Comments

* Thanks Ryan!

* Python: Bump dependencies to the latest version

* Remove from docs

* WIP

* Comments

* Make CI happy

* Update docstrings

* Do some renaming

* Add a context manager

* Rename commit to commit_transaction()

* Update docs

* Refresh in place

* Remove redudant call

* Load a fresh copy instead

* Fix the docstrings

* Restore CommitTableResponse

* Conflicts
  • Loading branch information
Fokko authored Jun 22, 2023
1 parent 5c64100 commit 80aee85
Show file tree
Hide file tree
Showing 16 changed files with 660 additions and 201 deletions.
168 changes: 29 additions & 139 deletions python/mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,110 +78,15 @@ catalog.load_table(("nyc", "taxis"))
# The tuple syntax can be used if the namespace or table contains a dot.
```

This returns a `Table` that represents an Iceberg table:

```python
Table(
identifier=('nyc', 'taxis'),
metadata_location='s3a://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json',
metadata=TableMetadataV2(
location='s3a://warehouse/wh/nyc.db/taxis',
table_uuid=UUID('ebd5d172-2162-453d-b586-1cdce52c1116'),
last_updated_ms=1662633437826,
last_column_id=19,
schemas=[Schema(
NestedField(field_id=1, name='VendorID', field_type=LongType(), required=False),
NestedField(field_id=2, name='tpep_pickup_datetime', field_type=TimestamptzType(), required=False),
NestedField(field_id=3, name='tpep_dropoff_datetime', field_type=TimestamptzType(), required=False),
NestedField(field_id=4, name='passenger_count', field_type=DoubleType(), required=False),
NestedField(field_id=5, name='trip_distance', field_type=DoubleType(), required=False),
NestedField(field_id=6, name='RatecodeID', field_type=DoubleType(), required=False),
NestedField(field_id=7, name='store_and_fwd_flag', field_type=StringType(), required=False),
NestedField(field_id=8, name='PULocationID', field_type=LongType(), required=False),
NestedField(field_id=9, name='DOLocationID', field_type=LongType(), required=False),
NestedField(field_id=10, name='payment_type', field_type=LongType(), required=False),
NestedField(field_id=11, name='fare_amount', field_type=DoubleType(), required=False),
NestedField(field_id=12, name='extra', field_type=DoubleType(), required=False),
NestedField(field_id=13, name='mta_tax', field_type=DoubleType(), required=False),
NestedField(field_id=14, name='tip_amount', field_type=DoubleType(), required=False),
NestedField(field_id=15, name='tolls_amount', field_type=DoubleType(), required=False),
NestedField(field_id=16, name='improvement_surcharge', field_type=DoubleType(), required=False),
NestedField(field_id=17, name='total_amount', field_type=DoubleType(), required=False),
NestedField(field_id=18, name='congestion_surcharge', field_type=DoubleType(), required=False),
NestedField(field_id=19, name='airport_fee', field_type=DoubleType(), required=False)
),
schema_id=0,
identifier_field_ids=[]
)],
current_schema_id=0,
partition_specs=[PartitionSpec(spec_id=0)],
default_spec_id=0,
last_partition_id=999,
properties={
'owner': 'root',
'write.format.default': 'parquet'
},
current_snapshot_id=8334458494559715805,
snapshots=[
Snapshot(
snapshot_id=7910949481055846233,
parent_snapshot_id=None,
sequence_number=None,
timestamp_ms=1662489306555,
manifest_list='s3a://warehouse/wh/nyc.db/taxis/metadata/snap-7910949481055846233-1-3eb7a2e1-5b7a-4e76-a29a-3e29c176eea4.avro',
summary=Summary(
Operation.APPEND,
**{
'spark.app.id': 'local-1662489289173',
'added-data-files': '1',
'added-records': '2979431',
'added-files-size': '46600777',
'changed-partition-count': '1',
'total-records': '2979431',
'total-files-size': '46600777',
'total-data-files': '1',
'total-delete-files': '0',
'total-position-deletes': '0',
'total-equality-deletes': '0'
}
),
schema_id=0
),
],
snapshot_log=[
SnapshotLogEntry(
snapshot_id='7910949481055846233',
timestamp_ms=1662489306555
)
],
metadata_log=[
MetadataLogEntry(
metadata_file='s3a://warehouse/wh/nyc.db/taxis/metadata/00000-b58341ba-6a63-4eea-9b2f-e85e47c7d09f.metadata.json',
timestamp_ms=1662489306555
)
],
sort_orders=[SortOrder(order_id=0)],
default_sort_order_id=0,
refs={
'main': SnapshotRef(
snapshot_id=8334458494559715805,
snapshot_ref_type=SnapshotRefType.BRANCH,
min_snapshots_to_keep=None,
max_snapshot_age_ms=None,
max_ref_age_ms=None
)
},
format_version=2,
last_sequence_number=1
)
)
```
This returns a `Table` that represents an Iceberg table that can be queried and altered.

### Directly from a metadata file

To load a table directly from a metadata file (i.e., **without** using a catalog), you can use a `StaticTable` as follows:

```python
from pyiceberg.table import StaticTable
table = StaticTable.from_metadata(
"s3a://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json"
)
Expand Down Expand Up @@ -241,52 +146,37 @@ catalog.create_table(
)
```

Which returns a newly created table:
### Update table properties

Set and remove properties through the `Transaction` API:

```python
Table(
identifier=('default', 'bids'),
metadata_location='/Users/fokkodriesprong/Desktop/docker-spark-iceberg/wh/bids//metadata/00000-c8cd93ab-f784-474d-a167-b1a86b05195f.metadata.json',
metadata=TableMetadataV2(
location='/Users/fokkodriesprong/Desktop/docker-spark-iceberg/wh/bids/',
table_uuid=UUID('38d4cb39-4945-4bf2-b374-984b5c4984d2'),
last_updated_ms=1661847562069,
last_column_id=4,
schemas=[
Schema(
NestedField(field_id=1, name='datetime', field_type=TimestampType(), required=False),
NestedField(field_id=2, name='bid', field_type=DoubleType(), required=False),
NestedField(field_id=3, name='ask', field_type=DoubleType(), required=False),
NestedField(field_id=4, name='symbol', field_type=StringType(), required=False)),
schema_id=1,
identifier_field_ids=[])
],
current_schema_id=1,
partition_specs=[
PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=DayTransform(), name='datetime_day'),))
],
default_spec_id=0,
last_partition_id=1000,
properties={},
current_snapshot_id=None,
snapshots=[],
snapshot_log=[],
metadata_log=[],
sort_orders=[
SortOrder(order_id=1, fields=[SortField(source_id=4, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST)])
],
default_sort_order_id=1,
refs={},
format_version=2,
last_sequence_number=0
)
)
with table.transaction() as transaction:
transaction.set_properties(abc="def")
assert table.properties == {"abc": "def"}
with table.transaction() as transaction:
transaction.remove_properties("abc")
assert table.properties == {}
```

Or, without a context manager:

```python
table = table.transaction().set_properties(abc="def").commit_transaction()
assert table.properties == {"abc": "def"}
table = table.transaction().remove_properties("abc").commit_transaction()
assert table.properties == {}
```

## Query a table
## Query the data

To query a table, a table scan is needed. A table scan accepts a filter, columns and optionally a limit and a snapshot ID:
To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID:

```python
from pyiceberg.catalog import load_catalog
Expand Down
57 changes: 26 additions & 31 deletions python/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 21 additions & 2 deletions python/pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import ToOutputFile
from pyiceberg.table import Table, TableMetadata
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
Table,
TableMetadata,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import (
EMPTY_DICT,
Expand Down Expand Up @@ -322,6 +327,20 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchTableError: If a table with the name does not exist.
"""

@abstractmethod
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Updates one or more tables.
Args:
table_request (CommitTableRequest): The table requests to be carried out.
Returns:
CommitTableResponse: The updated metadata.
Raises:
NoSuchTableError: If a table with the given identifier does not exist.
"""

@abstractmethod
def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
"""Create a namespace in the catalog.
Expand Down Expand Up @@ -392,7 +411,7 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper

@abstractmethod
def update_namespace_properties(
self, namespace: Union[str, Identifier], removals: set[str] | None = None, updates: Properties = EMPTY_DICT
self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT
) -> PropertiesUpdateSummary:
"""Removes provided property keys and updates properties for a namespace.
Expand Down
17 changes: 16 additions & 1 deletion python/pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import Table
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT
Expand Down Expand Up @@ -168,6 +168,20 @@ def create_table(

return self.load_table(identifier=identifier)

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Updates the table.
Args:
table_request (CommitTableRequest): The table requests to be carried out.
Returns:
CommitTableResponse: The updated metadata.
Raises:
NoSuchTableError: If a table with the given identifier does not exist.
"""
raise NotImplementedError

def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""
Loads the table's metadata and returns the table instance.
Expand Down Expand Up @@ -577,6 +591,7 @@ def _convert_dynamo_table_item_to_iceberg_table(self, dynamo_table_item: Dict[st
metadata=metadata,
metadata_location=metadata_location,
io=self._load_file_io(metadata.properties, metadata_location),
catalog=self,
)


Expand Down
Loading

0 comments on commit 80aee85

Please sign in to comment.