Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write support #41

Merged
merged 71 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
7133054
WIP: Write
Fokko Oct 3, 2023
ffecf72
Add logic to generate a new snapshot-id
Fokko Oct 3, 2023
25eb597
WIP
Fokko Oct 4, 2023
4cd493e
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Oct 5, 2023
a726b1d
Construct a writer tree
Fokko Oct 4, 2023
b88f736
WIP
Fokko Oct 5, 2023
f53626d
WIP
Fokko Oct 6, 2023
0c665ef
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Oct 9, 2023
3f79dbd
Fix linting
Fokko Oct 9, 2023
02430bb
Make the tests pass
Fokko Oct 9, 2023
eb4dd62
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Oct 10, 2023
c891382
Add support for V2
Fokko Oct 10, 2023
aae5a57
pre-commit
Fokko Oct 10, 2023
cff3a1d
Move things outside of pyarrow.py
Fokko Oct 10, 2023
082387e
Append WIP
Fokko Oct 11, 2023
997b673
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Oct 11, 2023
9d52906
WIP
Fokko Oct 11, 2023
8893cf3
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Oct 11, 2023
55f27c9
Add v1 to v2 promotion tests
Fokko Oct 11, 2023
9a0096b
Add _MergeSnapshots
Fokko Oct 11, 2023
4f5b710
Work on the Summary
Fokko Oct 11, 2023
926d947
Add tests
Fokko Oct 12, 2023
50575a8
Add Snapshot logic and Summary generation
Fokko Oct 12, 2023
5482ae0
WIP
Fokko Oct 13, 2023
2fa01f4
WIP
Fokko Oct 13, 2023
580c824
Cleanup
Fokko Oct 13, 2023
254d7e8
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Oct 15, 2023
f4ae6c5
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Oct 16, 2023
760c0d4
Merge branch 'main' of github.com:apache/iceberg-python into fd-snaps…
Fokko Oct 23, 2023
3dba41a
Refactor it a bit
Fokko Oct 23, 2023
bcc5176
Cleanup
Fokko Oct 23, 2023
6d5fbb1
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Oct 23, 2023
3309129
Merge branch 'main' of github.com:apache/iceberg-python into fd-snaps…
Fokko Oct 25, 2023
12c4699
Comments
Fokko Oct 25, 2023
8ef1a06
Merge branch 'fd-snapshots' of github.com:Fokko/iceberg-python into f…
Fokko Oct 25, 2023
aabfb09
Cleanup
Fokko Oct 25, 2023
149c3ec
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Dec 9, 2023
17fd689
WIP
Fokko Dec 9, 2023
54e36ab
Update poetry
Fokko Dec 9, 2023
ab36ec3
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Dec 12, 2023
d6df342
Cleanup
Fokko Dec 12, 2023
1398a2f
Update error
Fokko Dec 12, 2023
c426068
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Dec 18, 2023
1861647
WIP
Fokko Dec 18, 2023
cebc781
Make the CI happy
Fokko Dec 18, 2023
4d0d11c
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Dec 18, 2023
abda552
Cleanup
Fokko Dec 18, 2023
3cd5829
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Dec 19, 2023
5f86b15
fix
Fokko Dec 19, 2023
5044da6
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Dec 20, 2023
e020efb
Fix test
Fokko Dec 20, 2023
0b42471
Merge branch 'main' into fd-write
Fokko Dec 21, 2023
a41abd0
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Dec 26, 2023
286cf47
Thanks Amogh!
Fokko Dec 26, 2023
559618c
Merge branch 'fd-write' of github.com:Fokko/iceberg-python into fd-write
Fokko Dec 26, 2023
4153e78
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Jan 3, 2024
54e75d6
Make the CI happy
Fokko Jan 3, 2024
158077c
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Jan 11, 2024
bbc0b35
Update lint
Fokko Jan 11, 2024
d441af9
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Jan 12, 2024
e395a8f
Update pyiceberg/table/snapshots.py
Fokko Jan 12, 2024
2a65357
Comments and fixing some bugs
Fokko Jan 15, 2024
a013f35
Merge branch 'fd-write' of github.com:Fokko/iceberg-python into fd-write
Fokko Jan 15, 2024
abc0741
Remove doc
Fokko Jan 15, 2024
b817a15
Fix the tests
Fokko Jan 15, 2024
48ba852
Refactor
Fokko Jan 15, 2024
664e113
Move to fast-appends
Fokko Jan 16, 2024
85ac0eb
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Jan 17, 2024
7e8c04f
Merge branch 'main' of github.com:apache/iceberg-python into fd-write
Fokko Jan 17, 2024
7baf3ec
keep track of deleted files
Fokko Jan 17, 2024
ab020b9
Comments
Fokko Jan 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 50 additions & 10 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@
OutputFile,
OutputStream,
)
from pyiceberg.manifest import DataFile, FileFormat
from pyiceberg.manifest import (
DataFile,
DataFileContent,
FileFormat,
)
from pyiceberg.schema import (
PartnerAccessor,
PreOrderSchemaVisitor,
Expand All @@ -117,8 +121,9 @@
visit,
visit_with_partner,
)
from pyiceberg.table import WriteTask, _generate_datafile_filename
from pyiceberg.transforms import TruncateTransform
from pyiceberg.typedef import EMPTY_DICT, Properties
from pyiceberg.typedef import EMPTY_DICT, Properties, Record
from pyiceberg.types import (
BinaryType,
BooleanType,
Expand Down Expand Up @@ -1445,16 +1450,13 @@ def parquet_path_to_id_mapping(
def fill_parquet_file_metadata(
df: DataFile,
parquet_metadata: pq.FileMetaData,
file_size: int,
stats_columns: Dict[int, StatisticsCollector],
parquet_column_mapping: Dict[str, int],
) -> None:
"""
Compute and fill the following fields of the DataFile object.

- file_format
- record_count
- file_size_in_bytes
- column_sizes
- value_counts
- null_value_counts
Expand All @@ -1466,9 +1468,6 @@ def fill_parquet_file_metadata(
Args:
df (DataFile): A DataFile object representing the Parquet file for which metadata is to be filled.
parquet_metadata (pyarrow.parquet.FileMetaData): A pyarrow metadata object.
file_size (int): The total compressed file size cannot be retrieved from the metadata and hence has to
be passed here. Depending on the kind of file system and pyarrow library call used, different
ways to obtain this value might be appropriate.
stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to
set the mode for column metrics collection
"""
Expand Down Expand Up @@ -1565,13 +1564,54 @@ def fill_parquet_file_metadata(
del upper_bounds[field_id]
del null_value_counts[field_id]

df.file_format = FileFormat.PARQUET
df.record_count = parquet_metadata.num_rows
df.file_size_in_bytes = file_size
df.column_sizes = column_sizes
df.value_counts = value_counts
df.null_value_counts = null_value_counts
df.nan_value_counts = nan_value_counts
df.lower_bounds = lower_bounds
df.upper_bounds = upper_bounds
df.split_offsets = split_offsets


def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
task = next(tasks)

try:
_ = next(tasks)
# If there are more tasks, raise an exception
raise ValueError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208")
except StopIteration:
pass

df = task.df

file_path = f'{table.location()}/data/{_generate_datafile_filename("parquet")}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the _generate_datafile_filename method be a bit more robust? The UUID in the filename is not for uniqueness, it is to identify files from the same write. Here's a breakdown of the format used by Spark:

\d\d\d\d\d-\d-(UUID)-\d.(extension)
^^^^^^^^^^ ^^        ^^
|||||||||| ||        counter to distinguish files from the same task
|||||||||| unique task ID from the process (never repeats even for tasks that are retried)
  5-digit "partition number" that is the ordinal of a task within a Spark stage

Uniqueness is a combination of the task ID, the write-specific UUID, and the file counter for the task. The partition ordinal is used to preserve locality in file names.

It would be nice to expose some of these options in the WriteTask, like a unique UUID, unique task ID, and part ordinal. Then the counter would be handled locally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep in mind that we write a single file in the first iteration. I've moved the uuid and task-id to the WriteTask

file_schema = schema_to_pyarrow(table.schema())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Fokko! I am working with @syun64 to test out the impending write feature. During the test, we realized the field ids are not being set in the written parquet file.
To help illustrate this, I put together A diff against your working branch

The field_ids not written correctly in the parquet (current behavior) looks like:

<pyarrow._parquet.ParquetSchema object at 0x11c40c880>
required group field_id=-1 schema {
  optional binary field_id=-1 id (String);
  optional int32 field_id=-1 date (Date);
}

and the parquet schema after using a different metadata key for field id in the arrow schema to write the parquet file looks like:

<pyarrow._parquet.ParquetSchema object at 0x11c40c880>
required group field_id=-1 schema {
  optional binary field_id=1 id (String);
  optional int32 field_id=2 date (Date);
}

We feel it is a peculiar issue with pyarrow.parquet.ParquetWriter where we need to define the field_ids in the metadata of the pyarrow.schema conforming to a particular format like "PARQUET:field_id" instead of "field_id".
Do you think we should use a different pyarrow schema when we write the pyiceberg file?
prefix with the 'PARQUET:'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jqin61 for testing this as it is paramount that the field-IDs are written properly. I'm able to reproduce this locally:

 <pyarrow._parquet.ParquetSchema object at 0x138782440>
required group field_id=-1 schema {
  optional double field_id=-1 lat;
  optional double field_id=-1 long;
}

After changing this to PARQUET:field_id it is fixed indeed:

parq ~/Desktop/00000-0-f4a20311-0574-4d24-8b8e-2cdf747581af-0.parquet --schema 

 # Schema 
 <pyarrow._parquet.ParquetSchema object at 0x12087e340>
required group field_id=-1 schema {
  optional double field_id=1 lat;
  optional double field_id=2 long;
}

Thanks for flagging this!

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I have

df = pa.Table.from_pylist([{'a':"hello"}, {'a':"world"}])

Should I expect df to have a pa.Schema that can be converted with pyarrow_to_schema ? even the modified one presented in the diff above? I wasn't able to get either branch to work as the schema of df above has no metadata for its fields

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @robtandy thanks for chiming in here. I think the PyArrow to schema should also include the field-id metadata. When you create a new table, it should re-assign the field-ids if they are missing.


collected_metrics: List[pq.FileMetaData] = []
fo = table.io.new_output(file_path)
with fo.create() as fos:
with pq.ParquetWriter(fos, schema=file_schema, version="1.0", metadata_collector=collected_metrics) as writer:
writer.write_table(df)

df = DataFile(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's confusing that df is used for both the input dataframe and the output data file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we should not do that 👍

content=DataFileContent.DATA,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=Record(),
record_count=len(df),
file_size_in_bytes=len(fo),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also come from the write if possible so we don't have a S3 request here.

# Just copy these from the table for now
sort_order_id=table.sort_order().order_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the sort order of the table at the time the file was written, it is the sort used to order files in the data file. If we can't guarantee that the records are in this order we should not apply this metadata.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should come from the caller (write task?) and default to None.

spec_id=table.spec().spec_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is an unpartitioned write, we need to ensure that this is the unpartitioned spec in the table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We check if the partition spec is empty:

if len(self.spec().fields) > 0:
    raise ValueError("Currently only unpartitioned tables are supported")

Copy link
Contributor

@rdblue rdblue Jan 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since write_file is a public method, we can't guarantee that the caller did this check. I agree that it is safe when called from append or overwrite, but a caller could use this method directly to create a data file for a partitioned table right?

Wouldn't it be easy to just pass the spec ID and partition tuple (an empty Record) through WriteTask for now? I think it would make sense if a WriteTask were for a single partition.

equality_ids=table.schema().identifier_field_ids,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data files are never written with equality_ids. That is only used for equality delete files. It should always be null for data files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I messed up here, I thought it was referring to the identifier_field_ids

key_metadata=None,
)
fill_parquet_file_metadata(
df=df,
parquet_metadata=collected_metrics[0],
stats_columns=compute_statistics_plan(table.schema(), table.properties),
parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
)
return iter([df])
35 changes: 23 additions & 12 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from pyiceberg.io import FileIO, InputFile, OutputFile
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.typedef import Record
from pyiceberg.typedef import EMPTY_DICT, Record
from pyiceberg.types import (
BinaryType,
BooleanType,
Expand All @@ -60,6 +60,8 @@
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
DEFAULT_READ_VERSION: Literal[2] = 2

INITIAL_SEQUENCE_NUMBER = 0


class DataFileContent(int, Enum):
DATA = 0
Expand Down Expand Up @@ -504,7 +506,7 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition
NestedField(517, "content", IntegerType(), required=False, initial_default=ManifestContent.DATA),
NestedField(515, "sequence_number", LongType(), required=False, initial_default=0),
NestedField(516, "min_sequence_number", LongType(), required=False, initial_default=0),
NestedField(503, "added_snapshot_id", LongType(), required=False),
NestedField(503, "added_snapshot_id", LongType(), required=True),
NestedField(504, "added_files_count", IntegerType(), required=False),
NestedField(505, "existing_files_count", IntegerType(), required=False),
NestedField(506, "deleted_files_count", IntegerType(), required=False),
Expand All @@ -517,6 +519,7 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition

MANIFEST_FILE_SCHEMA_STRUCT = MANIFEST_FILE_SCHEMA.as_struct()


POSITIONAL_DELETE_SCHEMA = Schema(
NestedField(2147483546, "file_path", StringType()), NestedField(2147483545, "pos", IntegerType())
)
Expand Down Expand Up @@ -665,7 +668,9 @@ class ManifestWriter(ABC):
_min_data_sequence_number: Optional[int]
_partitions: List[Record]

def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int, meta: Dict[str, str]):
def __init__(
self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int, meta: Dict[str, str] = EMPTY_DICT
) -> None:
self.closed = False
self._spec = spec
self._schema = schema
Expand Down Expand Up @@ -746,7 +751,7 @@ def to_manifest_file(self) -> ManifestFile:
existing_rows_count=self._existing_rows,
deleted_rows_count=self._deleted_rows,
partitions=construct_partition_summaries(self._spec, self._schema, self._partitions),
key_metadatas=None,
key_metadata=None,
)

def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
Expand Down Expand Up @@ -851,7 +856,7 @@ class ManifestListWriter(ABC):
_commit_snapshot_id: int
_writer: AvroOutputFile[ManifestFile]

def __init__(self, output_file: OutputFile, meta: Dict[str, str]):
def __init__(self, output_file: OutputFile, meta: Dict[str, Any]):
self._output_file = output_file
self._meta = meta
self._manifest_files = []
Expand Down Expand Up @@ -884,7 +889,7 @@ def add_manifests(self, manifest_files: List[ManifestFile]) -> ManifestListWrite


class ManifestListWriterV1(ManifestListWriter):
def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: int):
def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int]):
super().__init__(
output_file, {"snapshot-id": str(snapshot_id), "parent-snapshot-id": str(parent_snapshot_id), "format-version": "1"}
)
Expand All @@ -897,9 +902,11 @@ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:

class ManifestListWriterV2(ManifestListWriter):
_commit_snapshot_id: int
_sequence_number: int
_sequence_number: Optional[int]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is required for manifest list. This is always known when we write the manifest list because manifest lists are written for every commit attempt (after the sequence number has been updated).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, let me check!


def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: int, sequence_number: int):
def __init__(
self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: Optional[int]
):
super().__init__(
output_file,
{
Expand All @@ -920,9 +927,9 @@ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
# To validate this, check that the snapshot id matches the current commit
if self._commit_snapshot_id != wrapped_manifest_file.added_snapshot_id:
raise ValueError(
f"Found unassigned sequence number for a manifest from snapshot: {wrapped_manifest_file.added_snapshot_id}"
f"Found unassigned sequence number for a manifest from snapshot: {self._commit_snapshot_id} != {wrapped_manifest_file.added_snapshot_id}"
)
wrapped_manifest_file.sequence_number = self._sequence_number
wrapped_manifest_file.sequence_number = self._sequence_number or INITIAL_SEQUENCE_NUMBER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct because the sequence number should always be passed in.


if wrapped_manifest_file.min_sequence_number == UNASSIGNED_SEQ:
if self._commit_snapshot_id != wrapped_manifest_file.added_snapshot_id:
Expand All @@ -931,12 +938,16 @@ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
)
# if the min sequence number is not determined, then there was no assigned sequence number for any file
# written to the wrapped manifest. Replace the unassigned sequence number with the one for this commit
wrapped_manifest_file.min_sequence_number = self._sequence_number
wrapped_manifest_file.min_sequence_number = self._sequence_number or INITIAL_SEQUENCE_NUMBER
return wrapped_manifest_file


def write_manifest_list(
format_version: Literal[1, 2], output_file: OutputFile, snapshot_id: int, parent_snapshot_id: int, sequence_number: int
format_version: Literal[1, 2],
output_file: OutputFile,
snapshot_id: int,
parent_snapshot_id: Optional[int],
sequence_number: Optional[int],
) -> ManifestListWriter:
if format_version == 1:
return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id)
Expand Down
Loading