From edf2b4fc18fd261b9cd9287cb627e23aba1d6644 Mon Sep 17 00:00:00 2001 From: Zeb Burke-Conte Date: Fri, 8 Sep 2023 18:42:56 -0700 Subject: [PATCH] FIX-#6540: Correct handling of range indices in read_parquet This change also fixes #6543. Signed-off-by: Zeb Burke-Conte --- modin/conftest.py | 17 ++ .../io/column_stores/parquet_dispatcher.py | 55 +++- modin/pandas/test/test_io.py | 252 +++++++++++++++++- 3 files changed, 308 insertions(+), 16 deletions(-) diff --git a/modin/conftest.py b/modin/conftest.py index 2cf09bcfe61..d8ab9859181 100644 --- a/modin/conftest.py +++ b/modin/conftest.py @@ -385,6 +385,9 @@ def _make_parquet_file( nrows=NROWS, ncols=2, force=True, + range_index_start=0, + range_index_step=1, + range_index_name=None, partitioned_columns=[], row_group_size: Optional[int] = None, ): @@ -402,6 +405,20 @@ def _make_parquet_file( df = pandas.DataFrame( {f"col{x + 1}": np.arange(nrows) for x in range(ncols)} ) + index = pandas.RangeIndex( + start=range_index_start, + stop=range_index_start + (nrows * range_index_step), + step=range_index_step, + name=range_index_name, + ) + if ( + range_index_start == 0 + and range_index_step == 1 + and range_index_name is None + ): + assert df.index.equals(index) + else: + df.index = index if len(partitioned_columns) > 0: df.to_parquet( filename, diff --git a/modin/core/io/column_stores/parquet_dispatcher.py b/modin/core/io/column_stores/parquet_dispatcher.py index 9816997ddbf..afc22bdd1f1 100644 --- a/modin/core/io/column_stores/parquet_dispatcher.py +++ b/modin/core/io/column_stores/parquet_dispatcher.py @@ -521,16 +521,24 @@ def build_index(cls, dataset, partition_ids, index_columns, filters): See `build_partition` for more detail on the contents of partitions_ids. """ range_index = True + range_index_metadata = None column_names_to_read = [] for column in index_columns: - # According to https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html, - # only RangeIndex will be stored as metadata. Otherwise, the default behavior is - # to store the index as a column. + # https://pandas.pydata.org/docs/development/developer.html#storing-pandas-dataframe-objects-in-apache-parquet-format + # describes the format of the index column metadata. + # It is a list, where each entry is either a string or a dictionary. + # A string means that a column stored in the dataset is (part of) the index. + # A dictionary is metadata about a RangeIndex, which is metadata-only and not stored + # in the dataset as a column. + # There cannot be both for a single dataframe, because a MultiIndex can only contain + # "actual data" columns and not RangeIndex objects. + # See similar code in pyarrow: https://github.com/apache/arrow/blob/44811ba18477560711d512939535c8389dd7787b/python/pyarrow/pandas_compat.py#L912-L926 + # and in fastparquet, here is where RangeIndex is handled: https://github.com/dask/fastparquet/blob/df1219300a96bc1baf9ebad85f4f5676a130c9e8/fastparquet/api.py#L809-L815 if isinstance(column, str): column_names_to_read.append(column) range_index = False - elif column["name"] is not None: - column_names_to_read.append(column["name"]) + elif column["kind"] == "range": + range_index_metadata = column # When the index has meaningful values, stored in a column, we will replicate those # exactly in the Modin dataframe's index. This index may have repeated values, be unsorted, @@ -538,7 +546,7 @@ def build_index(cls, dataset, partition_ids, index_columns, filters): # A range index is the special case: we want the Modin dataframe to have a single range, # not a range that keeps restarting. i.e. if the partitions have index 0-9, 0-19, 0-29, # we want our Modin dataframe to have 0-59. - # When there are no filters, it is relatively trivial to construct the index by + # When there are no filters, it is relatively cheap to construct the index by # actually reading in the necessary data, here in the main process. # When there are filters, we let the workers materialize the indices before combining to # get a single range. @@ -560,12 +568,37 @@ def build_index(cls, dataset, partition_ids, index_columns, filters): if range_index: # There are filters, so we had to materialize in order to # determine how many items there actually are - start = index_objs[0].start - total_length = sum(len(index_part) for index_part in index_objs) - complete_index = pandas.RangeIndex( - start=start, - stop=start + total_length, + total_filtered_length = sum( + len(index_part) for index_part in index_objs ) + + metadata_length_mismatch = False + if range_index_metadata is not None: + metadata_implied_length = ( + range_index_metadata["stop"] - range_index_metadata["start"] + ) / range_index_metadata["step"] + metadata_length_mismatch = ( + total_filtered_length != metadata_implied_length + ) + + # pyarrow ignores the RangeIndex metadata if it is not consistent with data length. + # https://github.com/apache/arrow/blob/44811ba18477560711d512939535c8389dd7787b/python/pyarrow/pandas_compat.py#L924-L926 + # fastparquet keeps the start and step from the metadata and just adjusts to the length. + # https://github.com/dask/fastparquet/blob/df1219300a96bc1baf9ebad85f4f5676a130c9e8/fastparquet/api.py#L815 + if range_index_metadata is None or ( + isinstance(dataset, PyArrowDataset) and metadata_length_mismatch + ): + complete_index = pandas.RangeIndex(total_filtered_length) + else: + complete_index = pandas.RangeIndex( + start=range_index_metadata["start"], + step=range_index_metadata["step"], + stop=( + range_index_metadata["start"] + + (total_filtered_length * range_index_metadata["step"]) + ), + name=range_index_metadata["name"], + ) else: complete_index = index_objs[0].append(index_objs[1:]) return complete_index, range_index or (len(index_columns) == 0) diff --git a/modin/pandas/test/test_io.py b/modin/pandas/test/test_io.py index 059357686c9..19b0c11291e 100644 --- a/modin/pandas/test/test_io.py +++ b/modin/pandas/test/test_io.py @@ -42,6 +42,7 @@ from modin.pandas.utils import from_arrow from modin.test.test_utils import warns_that_defaulting_to_pandas import pyarrow as pa +import pyarrow.dataset import fastparquet import os from io import BytesIO, StringIO @@ -1453,6 +1454,65 @@ def comparator(df1, df2): comparator=comparator, ) + @pytest.mark.parametrize("columns", [None, ["col1"]]) + @pytest.mark.parametrize( + "filters", + [None, [("col1", "<=", 1_000_000)], [("col1", "<=", 75), ("col2", ">=", 35)]], + ) + @pytest.mark.parametrize( + "range_index_start", + [0, 5_000], + ) + @pytest.mark.parametrize( + "range_index_step", + [1, 10], + ) + @pytest.mark.parametrize( + "range_index_name", + [None, "my_index"], + ) + def test_read_parquet_index( + self, + engine, + make_parquet_file, + columns, + filters, + range_index_start, + range_index_step, + range_index_name, + ): + if engine == "pyarrow" and filters == []: + # pyarrow, and therefore pandas using pyarrow, errors in this case. + # Modin correctly replicates this behavior; however error cases + # cause race conditions with ensure_clean on Windows. + # TODO: Remove this once https://github.com/modin-project/modin/issues/6460 is fixed. + pytest.xfail( + "Skipping empty filters error case to avoid race condition - see #6460" + ) + + with ensure_clean(".parquet") as unique_filename: + make_parquet_file( + filename=unique_filename, + range_index_start=range_index_start, + range_index_step=range_index_step, + range_index_name=range_index_name, + row_group_size=100, + ) + + def comparator(df1, df2): + df_equals(df1, df2) + df_equals(df1.dtypes, df2.dtypes) + + eval_io( + fn_name="read_parquet", + # read_parquet kwargs + engine=engine, + path=unique_filename, + columns=columns, + filters=filters, + comparator=comparator, + ) + def test_read_parquet_list_of_files_5698(self, engine, make_parquet_file): if engine == "fastparquet" and os.name == "nt": pytest.xfail(reason="https://github.com/pandas-dev/pandas/issues/51720") @@ -1479,13 +1539,41 @@ def test_read_parquet_indexing_by_column(self, tmp_path, engine, make_parquet_fi parquet_df[col] @pytest.mark.parametrize("columns", [None, ["col1"]]) + @pytest.mark.parametrize( + "filters", + [None, [("col1", "<=", 3_215), ("col2", ">=", 35)]], + ) @pytest.mark.parametrize("row_group_size", [None, 100, 1000, 10_000]) @pytest.mark.parametrize( "rows_per_file", [[1000] * 40, [0, 0, 40_000], [10_000, 10_000] + [100] * 200] ) @pytest.mark.exclude_in_sanity def test_read_parquet_directory( - self, engine, make_parquet_dir, columns, row_group_size, rows_per_file + self, engine, make_parquet_dir, columns, filters, row_group_size, rows_per_file + ): + self._test_read_parquet_directory( + engine=engine, + make_parquet_dir=make_parquet_dir, + columns=columns, + filters=filters, + range_index_start=0, + range_index_step=1, + range_index_name=None, + row_group_size=row_group_size, + rows_per_file=rows_per_file, + ) + + def _test_read_parquet_directory( + self, + engine, + make_parquet_dir, + columns, + filters, + range_index_start, + range_index_step, + range_index_name, + row_group_size, + rows_per_file, ): num_cols = DATASET_SIZE_DICT.get( TestDatasetSize.get(), DATASET_SIZE_DICT["Small"] @@ -1494,9 +1582,25 @@ def test_read_parquet_directory( start_row = 0 for i, length in enumerate(rows_per_file): end_row = start_row + length - dfs_by_filename[f"{i}.parquet"] = pandas.DataFrame( - {f"col{x + 1}": np.arange(start_row, end_row) for x in range(num_cols)} + df = pandas.DataFrame( + {f"col{x + 1}": np.arange(start_row, end_row) for x in range(num_cols)}, ) + index = pandas.RangeIndex( + start=range_index_start, + stop=range_index_start + (length * range_index_step), + step=range_index_step, + name=range_index_name, + ) + if ( + range_index_start == 0 + and range_index_step == 1 + and range_index_name is None + ): + assert df.index.equals(index) + else: + df.index = index + + dfs_by_filename[f"{i}.parquet"] = df start_row = end_row path = make_parquet_dir(dfs_by_filename, row_group_size) @@ -1513,6 +1617,123 @@ def test_read_parquet_directory( engine=engine, path=path, columns=columns, + filters=filters, + ) + + @pytest.mark.parametrize( + "filters", + [None, [("col1", "<=", 1_000_000)], [("col1", "<=", 75), ("col2", ">=", 35)]], + ) + @pytest.mark.parametrize( + "range_index_start", + [0, 5_000], + ) + @pytest.mark.parametrize( + "range_index_step", + [1, 10], + ) + @pytest.mark.parametrize( + "range_index_name", + [None, "my_index"], + ) + @pytest.mark.parametrize("row_group_size", [None, 20]) + def test_read_parquet_directory_range_index( + self, + engine, + make_parquet_dir, + filters, + range_index_start, + range_index_step, + range_index_name, + row_group_size, + ): + self._test_read_parquet_directory( + engine=engine, + make_parquet_dir=make_parquet_dir, + columns=None, + filters=filters, + range_index_start=range_index_start, + range_index_step=range_index_step, + range_index_name=range_index_name, + row_group_size=row_group_size, + # We don't vary rows_per_file, but we choose a + # tricky option: uneven with some empty files, + # none divisible by the row_group_size. + # We use a smaller total size than in other tests + # to make this test run faster. + rows_per_file=([250] + [0] * 10 + [25] * 10), + ) + + @pytest.mark.parametrize( + "filters", + [None, [("col1", "<=", 1_000_000)], [("col1", "<=", 75), ("col2", ">=", 35)]], + ) + @pytest.mark.parametrize( + "range_index_start", + [0, 5_000], + ) + @pytest.mark.parametrize( + "range_index_step", + [1, 10], + ) + @pytest.mark.parametrize( + "range_index_name", + [None, "my_index"], + ) + def test_read_parquet_directory_range_index_consistent_metadata( + self, + engine, + filters, + range_index_start, + range_index_step, + range_index_name, + tmp_path, + ): + num_cols = DATASET_SIZE_DICT.get( + TestDatasetSize.get(), DATASET_SIZE_DICT["Small"] + ) + df = pandas.DataFrame( + {f"col{x + 1}": np.arange(0, 500) for x in range(num_cols)}, + ) + index = pandas.RangeIndex( + start=range_index_start, + stop=range_index_start + (len(df) * range_index_step), + step=range_index_step, + name=range_index_name, + ) + if ( + range_index_start == 0 + and range_index_step == 1 + and range_index_name is None + ): + assert df.index.equals(index) + else: + df.index = index + + path = get_unique_filename(extension=None, data_dir=tmp_path) + + table = pa.Table.from_pandas(df) + pyarrow.dataset.write_dataset( + table, + path, + format="parquet", + max_rows_per_group=35, + max_rows_per_file=100, + ) + + # There are specific files that PyArrow will try to ignore by default + # in a parquet directory. One example are files that start with '_'. Our + # previous implementation tried to read all files in a parquet directory, + # but we now make use of PyArrow to ensure the directory is valid. + with open(os.path.join(path, "_committed_file"), "w+") as f: + f.write("testingtesting") + + eval_io( + fn_name="read_parquet", + # read_parquet kwargs + engine=engine, + path=path, + filters=filters, ) @pytest.mark.parametrize("columns", [None, ["col1"]]) @@ -1520,11 +1741,32 @@ def test_read_parquet_directory( "filters", [None, [], [("col1", "==", 5)], [("col1", "<=", 215), ("col2", ">=", 35)]], ) + @pytest.mark.parametrize( + "range_index_start", + [0, 5_000], + ) + @pytest.mark.parametrize( + "range_index_step", + [1, 10], + ) def test_read_parquet_partitioned_directory( - self, tmp_path, make_parquet_file, columns, filters, engine + self, + tmp_path, + make_parquet_file, + columns, + filters, + range_index_start, + range_index_step, + engine, ): unique_filename = get_unique_filename(extension=None, data_dir=tmp_path) - make_parquet_file(filename=unique_filename, partitioned_columns=["col1"]) + make_parquet_file( + filename=unique_filename, + partitioned_columns=["col1"], + range_index_start=range_index_start, + range_index_step=range_index_step, + range_index_name="my_index", + ) eval_io( fn_name="read_parquet",