Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

controls row groups and empty tables #1782

Merged
merged 8 commits into from
Sep 8, 2024
13 changes: 8 additions & 5 deletions dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(
file_max_items: int = None,
file_max_bytes: int = None,
disable_compression: bool = False,
_caps: DestinationCapabilitiesContext = None
_caps: DestinationCapabilitiesContext = None,
):
self.writer_spec = writer_spec
if self.writer_spec.requires_destination_capabilities and not _caps:
Expand Down Expand Up @@ -102,13 +102,13 @@ def write_data_item(self, item: TDataItems, columns: TTableSchemaColumns) -> int

new_rows_count: int
if isinstance(item, List):
# items coming in single list will be written together, not matter how many are there
self._buffered_items.extend(item)
# update row count, if item supports "num_rows" it will be used to count items
if len(item) > 0 and hasattr(item[0], "num_rows"):
new_rows_count = sum(tbl.num_rows for tbl in item)
else:
new_rows_count = len(item)
# items coming in single list will be written together, not matter how many are there
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
self._buffered_items.extend(item)
else:
self._buffered_items.append(item)
# update row count, if item supports "num_rows" it will be used to count items
Expand All @@ -117,8 +117,11 @@ def write_data_item(self, item: TDataItems, columns: TTableSchemaColumns) -> int
else:
new_rows_count = 1
self._buffered_items_count += new_rows_count
# flush if max buffer exceeded
if self._buffered_items_count >= self.buffer_max_items:
# flush if max buffer exceeded, the second path of the expression prevents empty data frames to pile up in the buffer
if (
self._buffered_items_count >= self.buffer_max_items
or len(self._buffered_items) >= self.buffer_max_items
):
self._flush_items()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we eliminate the comment by refactoring it to a method?

Suggested change
# flush if max buffer exceeded, the second path of the expression prevents empty data frames to pile up in the buffer
if (
self._buffered_items_count >= self.buffer_max_items
or len(self._buffered_items) >= self.buffer_max_items
):
self._flush_items()
self.flush_if_max_buffer_exceeded()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, _update_row_count(item) might be neat.

# set last modification date
self._last_modified = time.time()
Expand Down
32 changes: 24 additions & 8 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,17 +474,33 @@ def write_header(self, columns_schema: TTableSchemaColumns) -> None:
def write_data(self, rows: Sequence[Any]) -> None:
from dlt.common.libs.pyarrow import pyarrow

if not rows:
return
# concat batches and tables into a single one, preserving order
# pyarrow writer starts a row group for each item it writes (even with 0 rows)
# it also converts batches into tables internally. by creating a single table
# we allow the user rudimentary control over row group size via max buffered items
batches = []
tables = []
for row in rows:
if not self.writer:
self.writer = self._create_writer(row.schema)
if isinstance(row, pyarrow.Table):
self.writer.write_table(row, row_group_size=self.parquet_row_group_size)
elif isinstance(row, pyarrow.RecordBatch):
self.writer.write_batch(row, row_group_size=self.parquet_row_group_size)
self.items_count += row.num_rows
if isinstance(row, pyarrow.RecordBatch):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking: I find this surprising How can a row be a RecordBatch or Table? How can a row have num_rows?
Could we call it item like in the docs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right! this is how this class evolved, we started with lists of values to insert and now we deal with tables and other objects. I can rename to items and type properly

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Full typing requires using Generic classes, which is a good idea but we have no time to do it now

batches.append(row)
elif isinstance(row, pyarrow.Table):
if batches:
tables.append(pyarrow.Table.from_batches(batches))
batches = []
tables.append(row)
else:
raise ValueError(f"Unsupported type {type(row)}")
# count rows that got written
self.items_count += row.num_rows
if batches:
tables.append(pyarrow.Table.from_batches(batches))

table = pyarrow.concat_tables(tables, promote_options="none")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would find it easier to understand if we extract this into a method: self._concat_items(items)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, moved that to libs

if not self.writer:
self.writer = self._create_writer(table.schema)
# write concatenated tables, "none" options ensures 0 copy concat
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
self.writer.write_table(table, row_group_size=self.parquet_row_group_size)

def write_footer(self) -> None:
if not self.writer:
Expand Down
14 changes: 14 additions & 0 deletions docs/website/docs/dlt-ecosystem/file-formats/parquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Under the hood, `dlt` uses the [pyarrow parquet writer](https://arrow.apache.org
- `flavor`: Sanitize schema or set other compatibility options to work with various target systems. Defaults to None which is **pyarrow** default.
- `version`: Determine which Parquet logical types are available for use, whether the reduced set from the Parquet 1.x.x format or the expanded logical types added in later format versions. Defaults to "2.6".
- `data_page_size`: Set a target threshold for the approximate encoded size of data pages within a column chunk (in bytes). Defaults to None which is **pyarrow** default.
- `row_group_size`: Set the number of rows in a row group - see remarks below, because `pyarrow` does not handle this setting like you would expect.
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
- `timestamp_timezone`: A string specifying timezone, default is UTC.
- `coerce_timestamps`: resolution to which coerce timestamps, choose from **s**, **ms**, **us**, **ns**
- `allow_truncated_timestamps` - will raise if precision is lost on truncated timestamp.
Expand Down Expand Up @@ -76,3 +77,16 @@ You can generate parquet files without timezone adjustment information in two wa
2. Set the **timestamp_timezone** to empty string (ie. `DATA_WRITER__TIMESTAMP_TIMEZONE=""`) to generate logical type without UTC adjustment.

To our best knowledge, arrow will convert your timezone aware DateTime(s) to UTC and store them in parquet without timezone information.


### Row group size
`pyarrow` parquet writer writes each item (be it table or batch which is internally converted to table) in a separate row group. This may lead to many small row groups
to be created which may not be optimal for certain query engines (ie. `duckdb` parallelizes on a row group). `dlt` allows to control the size of the row group by
buffering and concatenating (0 copy) tables/batches before they are written. You can control the size by setting the buffer size
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
```toml
[extract.data_writer]
buffer_max_items=1000000
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
```
Mind that we must hold the tables in memory. 1 000 000 rows in example above may take quite large amount of it.
rudolfix marked this conversation as resolved.
Show resolved Hide resolved

`row_group_size` has limited utility with `pyarrow` writer. It will split large tables into many groups if set below item buffer size.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sentence is not clear to me yet. Is the instruction to the user something like the following?

Suggested change
`row_group_size` has limited utility with `pyarrow` writer. It will split large tables into many groups if set below item buffer size.
For the `pyarrow` parquet writer, ensure to have`row_group_size >= buffer_max_items`. Otherwise, your destination might have more row groups than optimal.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohhh so actually the reverse is true. row_group_size < buffer_max_item to have any effect. this is the core of the problem I'm fixing here. pyarrow will create row group of size of parquet table being written or smaller. btw. other, well desgined implementations allow to write batches to the same groups. not here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it now, thanks! Maybe we can give a recommendation like this:

Suggested change
`row_group_size` has limited utility with `pyarrow` writer. It will split large tables into many groups if set below item buffer size.
Setting `row_group_size` has limited utility with the `pyarrow` parquet writer because large source tables can end up fragmented into too many groups.
Thus, we recommend setting `row_group_size < buffer_max_items` only when the write_disposition is `"replace"`.
For all other write dispositions, we recommend the default `row_group_size` to avoid fragmentation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

row groups do not map to write dispositions like this... I think this is only relevant to advanced users that optimize their parquet files for a particular query engine...

83 changes: 82 additions & 1 deletion tests/libs/test_parquet_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from dlt.common import pendulum, Decimal, json
from dlt.common.configuration import inject_section
from dlt.common.data_writers.writers import ParquetDataWriter
from dlt.common.data_writers.writers import ArrowToParquetWriter, ParquetDataWriter
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.schema.utils import new_column
from dlt.common.configuration.specs.config_section_context import ConfigSectionContext
Expand Down Expand Up @@ -313,3 +313,84 @@ def _assert_pq_column(col: int, prec: str) -> None:
_assert_pq_column(1, "milliseconds")
_assert_pq_column(2, "microseconds")
_assert_pq_column(3, "nanoseconds")


def test_arrow_parquet_row_group_size() -> None:
import pyarrow as pa

c1 = {"col1": new_column("col1", "bigint")}

id_ = -1

def get_id_() -> int:
nonlocal id_
id_ += 1
return id_

single_elem_table = lambda: pa.Table.from_pylist([{"col1": get_id_()}])
single_elem_batch = lambda: pa.RecordBatch.from_pylist([{"col1": get_id_()}])

with get_writer(ArrowToParquetWriter, file_max_bytes=2**8, buffer_max_items=2) as writer:
writer.write_data_item(single_elem_table(), columns=c1)
writer._flush_items()
assert writer._writer.items_count == 1

with pa.parquet.ParquetFile(writer.closed_files[0].file_path) as reader:
assert reader.num_row_groups == 1
assert reader.metadata.row_group(0).num_rows == 1

# should be packages into single group
with get_writer(ArrowToParquetWriter, file_max_bytes=2**8, buffer_max_items=2) as writer:
writer.write_data_item(
[
single_elem_table(),
single_elem_batch(),
single_elem_batch(),
single_elem_table(),
single_elem_batch(),
],
columns=c1,
)
writer._flush_items()
assert writer._writer.items_count == 5

with pa.parquet.ParquetFile(writer.closed_files[0].file_path) as reader:
assert reader.num_row_groups == 1
assert reader.metadata.row_group(0).num_rows == 5

with open(writer.closed_files[0].file_path, "rb") as f:
table = pq.read_table(f)
# all ids are there and in order
assert table["col1"].to_pylist() == list(range(1, 6))

# pass also empty and make it to be written with a separate call to parquet writer (by buffer_max_items)
with get_writer(ArrowToParquetWriter, file_max_bytes=2**8, buffer_max_items=1) as writer:
pq_batch = single_elem_batch()
writer.write_data_item(pq_batch, columns=c1)
# writer._flush_items()
# assert writer._writer.items_count == 5
# this will also create arrow schema
print(pq_batch.schema)
writer.write_data_item(pa.RecordBatch.from_pylist([], schema=pq_batch.schema), columns=c1)

with pa.parquet.ParquetFile(writer.closed_files[0].file_path) as reader:
assert reader.num_row_groups == 2
assert reader.metadata.row_group(0).num_rows == 1
# row group with size 0 for an empty item
assert reader.metadata.row_group(1).num_rows == 0


def test_empty_tables_get_flushed() -> None:
c1 = {"col1": new_column("col1", "bigint")}
single_elem_table = pa.Table.from_pylist([{"col1": 1}])
empty_batch = pa.RecordBatch.from_pylist([], schema=single_elem_table.schema)

with get_writer(ArrowToParquetWriter, file_max_bytes=2**8, buffer_max_items=2) as writer:
writer.write_data_item(empty_batch, columns=c1)
writer.write_data_item(empty_batch, columns=c1)
# written
assert len(writer._buffered_items) == 0
writer.write_data_item(empty_batch, columns=c1)
assert len(writer._buffered_items) == 1
writer.write_data_item(single_elem_table, columns=c1)
assert len(writer._buffered_items) == 0
111 changes: 0 additions & 111 deletions tests/libs/test_pyarrow.py

This file was deleted.

Loading