Skip to content

Commit

Permalink
Fix read_parquet bug for extended dtypes from remote storage (#9638)
Browse files Browse the repository at this point in the history
This fixes a `read_parquet` bug discovered while iterating on #9589

Without this fix, the optimized `read_parquet` code path will fail when the pandas metadata includes index-column information. It may also fail when the data includes list or struct columns (depending on the engine that wrote the parquet file).

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - https://github.com/brandon-b-miller

URL: #9638
  • Loading branch information
rjzamora authored Nov 11, 2021
1 parent 9f9a377 commit 1e4afd1
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 5 deletions.
32 changes: 27 additions & 5 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2019-2020, NVIDIA CORPORATION.

import io
import json
import warnings
from collections import defaultdict
from uuid import uuid4
Expand Down Expand Up @@ -209,7 +210,7 @@ def _process_row_groups(paths, fs, filters=None, row_groups=None):
return file_list, row_groups


def _get_byte_ranges(file_list, row_groups, columns, fs):
def _get_byte_ranges(file_list, row_groups, columns, fs, **kwargs):

# This utility is used to collect the footer metadata
# from a parquet file. This metadata is used to define
Expand Down Expand Up @@ -242,7 +243,7 @@ def _get_byte_ranges(file_list, row_groups, columns, fs):
#
# This "sample size" can be tunable, but should
# always be >= 8 bytes (so we can read the footer size)
tail_size = min(32_000, file_size)
tail_size = min(kwargs.get("footer_sample_size", 32_000), file_size,)
if fs is None:
path.seek(file_size - tail_size)
footer_sample = path.read(tail_size)
Expand All @@ -262,6 +263,22 @@ def _get_byte_ranges(file_list, row_groups, columns, fs):
# Step 3 - Collect required byte ranges
byte_ranges = []
md = pq.ParquetFile(io.BytesIO(footer_sample)).metadata
column_set = None if columns is None else set(columns)
if column_set is not None:
schema = md.schema.to_arrow_schema()
has_pandas_metadata = (
schema.metadata is not None and b"pandas" in schema.metadata
)
if has_pandas_metadata:
md_index = [
ind
for ind in json.loads(
schema.metadata[b"pandas"].decode("utf8")
).get("index_columns", [])
# Ignore RangeIndex information
if not isinstance(ind, dict)
]
column_set |= set(md_index)
for r in range(md.num_row_groups):
# Skip this row-group if we are targetting
# specific row-groups
Expand All @@ -272,11 +289,16 @@ def _get_byte_ranges(file_list, row_groups, columns, fs):
name = column.path_in_schema
# Skip this column if we are targetting a
# specific columns
if columns is None or name in columns:
split_name = name.split(".")[0]
if (
column_set is None
or name in column_set
or split_name in column_set
):
file_offset0 = column.dictionary_page_offset
if file_offset0 is None:
file_offset0 = column.data_page_offset
num_bytes = column.total_uncompressed_size
num_bytes = column.total_compressed_size
byte_ranges.append((file_offset0, num_bytes))

all_byte_ranges.append(byte_ranges)
Expand Down Expand Up @@ -352,7 +374,7 @@ def read_parquet(
)
):
byte_ranges, footers, file_sizes = _get_byte_ranges(
filepath_or_buffer, row_groups, columns, fs,
filepath_or_buffer, row_groups, columns, fs, **kwargs
)

filepaths_or_buffers = []
Expand Down
54 changes: 54 additions & 0 deletions python/cudf/cudf/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ def pdf(scope="module"):
return df


@pytest.fixture
def pdf_ext(scope="module"):
size = 100
df = pd.DataFrame()
df["Integer"] = np.array([i for i in range(size)])
df["List"] = [[i] for i in range(size)]
df["Struct"] = [{"a": i} for i in range(size)]
return df


@pytest.mark.parametrize("bytes_per_thread", [32, 1024])
def test_read_csv(s3_base, s3so, pdf, bytes_per_thread):
# Write to buffer
Expand Down Expand Up @@ -253,6 +263,50 @@ def test_read_parquet(
assert_eq(expect, got2)


@pytest.mark.parametrize("bytes_per_thread", [32, 1024])
@pytest.mark.parametrize("columns", [None, ["List", "Struct"]])
@pytest.mark.parametrize("use_python_file_object", [False, True])
@pytest.mark.parametrize("index", [None, "Integer"])
def test_read_parquet_ext(
s3_base,
s3so,
pdf_ext,
bytes_per_thread,
columns,
use_python_file_object,
index,
):
fname = "test_parquet_reader_ext.parquet"
bname = "parquet"
buffer = BytesIO()

if index:
pdf_ext.set_index(index).to_parquet(path=buffer)
else:
pdf_ext.to_parquet(path=buffer)

# Check direct path handling
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
got1 = cudf.read_parquet(
"s3://{}/{}".format(bname, fname),
use_python_file_object=use_python_file_object,
storage_options=s3so,
bytes_per_thread=bytes_per_thread,
footer_sample_size=3200,
columns=columns,
)
if index:
expect = (
pdf_ext.set_index(index)[columns]
if columns
else pdf_ext.set_index(index)
)
else:
expect = pdf_ext[columns] if columns else pdf_ext
assert_eq(expect, got1)


@pytest.mark.parametrize("columns", [None, ["Float", "String"]])
def test_read_parquet_arrow_nativefile(s3_base, s3so, pdf, columns):
# Write to buffer
Expand Down

0 comments on commit 1e4afd1

Please sign in to comment.