Skip to content

Commit

Permalink
fix: read parquet file in chunked mode per row group (#3016)
Browse files Browse the repository at this point in the history
* fix: read parquet file in chunked mode per row group

* fix: Fix test_empty_parquet

* feat: reduce memory impact of chunked reads

---------

Co-authored-by: Abdel Jaidi <[email protected]>
  • Loading branch information
FredericKayser and jaidisido authored Nov 21, 2024
1 parent 05d0731 commit d485112
Showing 1 changed file with 26 additions and 23 deletions.
49 changes: 26 additions & 23 deletions awswrangler/s3/_read_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,33 +247,36 @@ def _read_parquet_chunked(
if pq_file is None:
continue

use_threads_flag: bool = use_threads if isinstance(use_threads, bool) else bool(use_threads > 1)
chunks = pq_file.iter_batches(
batch_size=batch_size, columns=columns, use_threads=use_threads_flag, use_pandas_metadata=False
)

schema = pq_file.schema.to_arrow_schema()
metadata = pq_file.metadata
schema = metadata.schema.to_arrow_schema()
if columns:
schema = pa.schema([schema.field(column) for column in columns], schema.metadata)

table = _add_table_partitions(
table=pa.Table.from_batches(chunks, schema=schema),
path=path,
path_root=path_root,
)
df = _table_to_df(table=table, kwargs=arrow_kwargs)
if chunked is True:
yield df
use_threads_flag: bool = use_threads if isinstance(use_threads, bool) else bool(use_threads > 1)
table_kwargs = {"path": path, "path_root": path_root}
if metadata.num_rows > 0:
for chunk in pq_file.iter_batches(
batch_size=batch_size, columns=columns, use_threads=use_threads_flag, use_pandas_metadata=False
):
table = _add_table_partitions(table=pa.Table.from_batches([chunk], schema=schema), **table_kwargs)
df = _table_to_df(table=table, kwargs=arrow_kwargs)
if chunked is True:
yield df
else:
if next_slice is not None:
df = pd.concat(objs=[next_slice, df], sort=False, copy=False)
while len(df.index) >= chunked:
yield df.iloc[:chunked, :].copy()
df = df.iloc[chunked:, :]
if df.empty:
next_slice = None
else:
next_slice = df
else:
if next_slice is not None:
df = pd.concat(objs=[next_slice, df], sort=False, copy=False)
while len(df.index) >= chunked:
yield df.iloc[:chunked, :].copy()
df = df.iloc[chunked:, :]
if df.empty:
next_slice = None
else:
next_slice = df
table = _add_table_partitions(table=pa.Table.from_batches([], schema=schema), **table_kwargs)
df = _table_to_df(table=table, kwargs=arrow_kwargs)
yield df

if next_slice is not None:
yield next_slice

Expand Down

0 comments on commit d485112

Please sign in to comment.