-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add class to write Avro files and add PoC update_table api call #377
base: fd-avro
Are you sure you want to change the base?
Conversation
@@ -199,3 +202,76 @@ def __next__(self) -> D: | |||
|
|||
def _read_header(self) -> AvroFileHeader: | |||
return construct_reader(META_SCHEMA, {-1: AvroFileHeader}).read(self.decoder) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this class I can write Avro files like this:
with avro.AvroOutputFile(PyArrowFileIO().new_output("writetest.avro"), MANIFEST_ENTRY_SCHEMA, "manifest_entry") as out:
out.write_block([entry])
But perhaps we could also refactor the AvroFile and AvroOutputFile classes into a single RW class.
if field.required: | ||
return field_result | ||
else: | ||
return OptionWriter(field_result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this change the avro file structure was the same as for a required field, which can't be parsed as an optional field.
self.key_writer.write(encoder, k) | ||
self.value_writer.write(encoder, v) | ||
if len(val) > 0: | ||
encoder.write_int(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From https://avro.apache.org/docs/1.11.1/specification/#schema-maps : "A block with count zero indicates the end of the map."
@@ -184,6 +184,8 @@ def write(self, encoder: BinaryEncoder, val: List[Any]) -> None: | |||
encoder.write_int(len(val)) | |||
for v in val: | |||
self.element_writer.write(encoder, v) | |||
if len(val) > 0: | |||
encoder.write_int(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From https://avro.apache.org/docs/1.11.1/specification/#arrays-1 : "A block with count zero indicates the end of the array."
@@ -100,14 +100,14 @@ def __repr__(self) -> str: | |||
field_id=108, | |||
name="column_sizes", | |||
field_type=MapType(key_id=117, key_type=IntegerType(), value_id=118, value_type=LongType()), | |||
required=True, | |||
required=False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See PR apache#7796
doc="Map of column id to total size on disk", | ||
), | ||
NestedField( | ||
field_id=109, | ||
name="value_counts", | ||
field_type=MapType(key_id=119, key_type=IntegerType(), value_id=120, value_type=LongType()), | ||
required=True, | ||
required=False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
@@ -192,7 +192,7 @@ def __init__(self, *data: Any, **named_data: Any) -> None: | |||
NestedField(1, "snapshot_id", LongType(), required=False), | |||
NestedField(3, "sequence_number", LongType(), required=False), | |||
NestedField(4, "file_sequence_number", LongType(), required=False), | |||
NestedField(2, "data_file", DATA_FILE_TYPE, required=False), | |||
NestedField(2, "data_file", DATA_FILE_TYPE, required=True), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
@@ -76,7 +76,7 @@ | |||
|
|||
|
|||
class AvroSchemaConversion: | |||
def avro_to_iceberg(self, avro_schema: dict[str, Any]) -> Schema: | |||
def avro_to_iceberg(self, avro_schema: Dict[str, Any]) -> Schema: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With python 3.8 dict
wasn't recognized as a type annotation
@@ -335,6 +428,70 @@ def _handle_non_200_response(self, exc: HTTPError, error_handler: Dict[int, Type | |||
|
|||
raise exception(response) from exc | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is basically a PoC now. Since there are many table update operations, we should look at the Java API to see if it should be a single function with complex parameters or several simpler functions.
@@ -155,3 +156,6 @@ def __eq__(self, other: Any) -> bool: | |||
|
|||
def __repr__(self) -> str: | |||
return f"{self.__class__.__name__}[{', '.join(f'{key}={repr(value)}' for key, value in self.__dict__.items() if not key.startswith('_'))}]" | |||
|
|||
def fields(self) -> List[str]: | |||
return [self.__getattribute__(v) if hasattr(self, v) else None for v in self._position_to_field_name.values()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't find a more elegant way to return an array of fields preserving declaration order and preserving None for unset optional fields.
@maxdebayser Can I do the following suggestion:
|
Sure. Just to be clear, should I create a new branch off the apache/iceberg master, cherry pick the work from this branch over there and add the read/write tests? |
I would suggest the following:
|
Hi @Fokko , I was trying to implement writing to Iceberg from Python without using pyspark when I found your PR that already implemented some of the steps that are required. With the additions in this PR, I can build the avro files, upload them to S3 and call the API to update the table metadata to point to the metadata and data.
This PR is by no means complete, but I hope to get some feedback on this as I'm keen on having this feature in the pyiceberg library.