Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RowGroupMetaData information to cudf.io.read_parquet_metadata #15320

6 changes: 5 additions & 1 deletion python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,12 @@ def read_parquet_metadata(path):
num_rows = pq_file.metadata.num_rows
num_row_groups = pq_file.num_row_groups
col_names = pq_file.schema.names
row_group_metadata = [
pq_file.metadata.row_group(row_group_number)
for row_group_number in range(num_row_groups)
]

return num_rows, num_row_groups, col_names
return num_rows, num_row_groups, col_names, row_group_metadata


@_cudf_nvtx_annotate
Expand Down
29 changes: 25 additions & 4 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ def num_row_groups(rows, group_size):
row_group_size = 5
pdf.to_parquet(fname, compression="snappy", row_group_size=row_group_size)

num_rows, row_groups, col_names = cudf.io.read_parquet_metadata(fname)
num_rows, row_groups, col_names, _ = cudf.io.read_parquet_metadata(fname)

assert num_rows == len(pdf.index)
assert row_groups == num_row_groups(num_rows, row_group_size)
Expand Down Expand Up @@ -561,7 +561,7 @@ def test_parquet_read_row_groups(tmpdir, pdf, row_group_size):
fname = tmpdir.join("row_group.parquet")
pdf.to_parquet(fname, compression="gzip", row_group_size=row_group_size)

num_rows, row_groups, col_names = cudf.io.read_parquet_metadata(fname)
num_rows, row_groups, col_names, _ = cudf.io.read_parquet_metadata(fname)

gdf = [cudf.read_parquet(fname, row_groups=[i]) for i in range(row_groups)]
gdf = cudf.concat(gdf)
Expand All @@ -586,7 +586,7 @@ def test_parquet_read_row_groups_non_contiguous(tmpdir, pdf, row_group_size):
fname = tmpdir.join("row_group.parquet")
pdf.to_parquet(fname, compression="gzip", row_group_size=row_group_size)

num_rows, row_groups, col_names = cudf.io.read_parquet_metadata(fname)
num_rows, row_groups, col_names, _ = cudf.io.read_parquet_metadata(fname)

# alternate rows between the two sources
gdf = cudf.read_parquet(
Expand Down Expand Up @@ -2845,14 +2845,35 @@ def test_to_parquet_row_group_size(
fname, row_group_size_bytes=size_bytes, row_group_size_rows=size_rows
)

num_rows, row_groups, col_names = cudf.io.read_parquet_metadata(fname)
num_rows, row_groups, col_names, _ = cudf.io.read_parquet_metadata(fname)
# 8 bytes per row, as the column is int64
expected_num_rows = max(
math.ceil(num_rows / size_rows), math.ceil(8 * num_rows / size_bytes)
)
assert expected_num_rows == row_groups


@pytest.mark.parametrize("size_rows", [500_000, 100_000, 10_000])
def test_parquet_row_group_metadata(tmpdir, large_int64_gdf, size_rows):
fname = tmpdir.join("row_group_size.parquet")
large_int64_gdf.to_parquet(fname, row_group_size_rows=size_rows)

# read file metadata from parquet
(
num_rows,
row_groups,
col_names,
row_group_metadata,
) = cudf.io.read_parquet_metadata(fname)

# length(RowGroupsMetaData) == number of row groups
assert len(row_group_metadata) == row_groups
# sum of rows in row groups == total rows
assert num_rows == sum(
[row_group.to_dict()["num_rows"] for row_group in row_group_metadata]
)


def test_parquet_reader_decimal_columns():
df = cudf.DataFrame(
{
Expand Down
3 changes: 2 additions & 1 deletion python/cudf/cudf/utils/ioutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,12 @@
Total number of rows
Number of row groups
List of column names
List of metadata of row groups

Examples
--------
>>> import cudf
>>> num_rows, num_row_groups, names = cudf.io.read_parquet_metadata(filename)
>>> num_rows, num_row_groups, names, row_group_metadata = cudf.io.read_parquet_metadata(filename)
>>> df = [cudf.read_parquet(fname, row_group=i) for i in range(row_groups)]
>>> df = cudf.concat(df)
>>> df
Expand Down
Loading