From 1e4afd16d470a4fc91986ee0236dc97eabf1f0f1 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Thu, 11 Nov 2021 08:35:08 -0600 Subject: [PATCH] Fix read_parquet bug for extended dtypes from remote storage (#9638) This fixes a `read_parquet` bug discovered while iterating on #9589 Without this fix, the optimized `read_parquet` code path will fail when the pandas metadata includes index-column information. It may also fail when the data includes list or struct columns (depending on the engine that wrote the parquet file). Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - https://github.com/brandon-b-miller URL: https://github.com/rapidsai/cudf/pull/9638 --- python/cudf/cudf/io/parquet.py | 32 +++++++++++++++--- python/cudf/cudf/tests/test_s3.py | 54 +++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 5 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index a60ec07b894..302021a082f 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -1,6 +1,7 @@ # Copyright (c) 2019-2020, NVIDIA CORPORATION. import io +import json import warnings from collections import defaultdict from uuid import uuid4 @@ -209,7 +210,7 @@ def _process_row_groups(paths, fs, filters=None, row_groups=None): return file_list, row_groups -def _get_byte_ranges(file_list, row_groups, columns, fs): +def _get_byte_ranges(file_list, row_groups, columns, fs, **kwargs): # This utility is used to collect the footer metadata # from a parquet file. This metadata is used to define @@ -242,7 +243,7 @@ def _get_byte_ranges(file_list, row_groups, columns, fs): # # This "sample size" can be tunable, but should # always be >= 8 bytes (so we can read the footer size) - tail_size = min(32_000, file_size) + tail_size = min(kwargs.get("footer_sample_size", 32_000), file_size,) if fs is None: path.seek(file_size - tail_size) footer_sample = path.read(tail_size) @@ -262,6 +263,22 @@ def _get_byte_ranges(file_list, row_groups, columns, fs): # Step 3 - Collect required byte ranges byte_ranges = [] md = pq.ParquetFile(io.BytesIO(footer_sample)).metadata + column_set = None if columns is None else set(columns) + if column_set is not None: + schema = md.schema.to_arrow_schema() + has_pandas_metadata = ( + schema.metadata is not None and b"pandas" in schema.metadata + ) + if has_pandas_metadata: + md_index = [ + ind + for ind in json.loads( + schema.metadata[b"pandas"].decode("utf8") + ).get("index_columns", []) + # Ignore RangeIndex information + if not isinstance(ind, dict) + ] + column_set |= set(md_index) for r in range(md.num_row_groups): # Skip this row-group if we are targetting # specific row-groups @@ -272,11 +289,16 @@ def _get_byte_ranges(file_list, row_groups, columns, fs): name = column.path_in_schema # Skip this column if we are targetting a # specific columns - if columns is None or name in columns: + split_name = name.split(".")[0] + if ( + column_set is None + or name in column_set + or split_name in column_set + ): file_offset0 = column.dictionary_page_offset if file_offset0 is None: file_offset0 = column.data_page_offset - num_bytes = column.total_uncompressed_size + num_bytes = column.total_compressed_size byte_ranges.append((file_offset0, num_bytes)) all_byte_ranges.append(byte_ranges) @@ -352,7 +374,7 @@ def read_parquet( ) ): byte_ranges, footers, file_sizes = _get_byte_ranges( - filepath_or_buffer, row_groups, columns, fs, + filepath_or_buffer, row_groups, columns, fs, **kwargs ) filepaths_or_buffers = [] diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index ff551ec74ca..dea876891f8 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -124,6 +124,16 @@ def pdf(scope="module"): return df +@pytest.fixture +def pdf_ext(scope="module"): + size = 100 + df = pd.DataFrame() + df["Integer"] = np.array([i for i in range(size)]) + df["List"] = [[i] for i in range(size)] + df["Struct"] = [{"a": i} for i in range(size)] + return df + + @pytest.mark.parametrize("bytes_per_thread", [32, 1024]) def test_read_csv(s3_base, s3so, pdf, bytes_per_thread): # Write to buffer @@ -253,6 +263,50 @@ def test_read_parquet( assert_eq(expect, got2) +@pytest.mark.parametrize("bytes_per_thread", [32, 1024]) +@pytest.mark.parametrize("columns", [None, ["List", "Struct"]]) +@pytest.mark.parametrize("use_python_file_object", [False, True]) +@pytest.mark.parametrize("index", [None, "Integer"]) +def test_read_parquet_ext( + s3_base, + s3so, + pdf_ext, + bytes_per_thread, + columns, + use_python_file_object, + index, +): + fname = "test_parquet_reader_ext.parquet" + bname = "parquet" + buffer = BytesIO() + + if index: + pdf_ext.set_index(index).to_parquet(path=buffer) + else: + pdf_ext.to_parquet(path=buffer) + + # Check direct path handling + buffer.seek(0) + with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + got1 = cudf.read_parquet( + "s3://{}/{}".format(bname, fname), + use_python_file_object=use_python_file_object, + storage_options=s3so, + bytes_per_thread=bytes_per_thread, + footer_sample_size=3200, + columns=columns, + ) + if index: + expect = ( + pdf_ext.set_index(index)[columns] + if columns + else pdf_ext.set_index(index) + ) + else: + expect = pdf_ext[columns] if columns else pdf_ext + assert_eq(expect, got1) + + @pytest.mark.parametrize("columns", [None, ["Float", "String"]]) def test_read_parquet_arrow_nativefile(s3_base, s3so, pdf, columns): # Write to buffer