Skip to content

Commit

Permalink
FIX-modin-project#6540: Correct handling of range indices and index n…
Browse files Browse the repository at this point in the history
…ames in read_parquet (modin-project#6545)

This change also fixes modin-project#6543.

Signed-off-by: Zeb Burke-Conte <[email protected]>
  • Loading branch information
zmbc committed Sep 19, 2023
1 parent 70b3a26 commit 8813981
Show file tree
Hide file tree
Showing 3 changed files with 305 additions and 29 deletions.
17 changes: 17 additions & 0 deletions modin/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,9 @@ def _make_parquet_file(
nrows=NROWS,
ncols=2,
force=True,
range_index_start=0,
range_index_step=1,
range_index_name=None,
partitioned_columns=[],
row_group_size: Optional[int] = None,
):
Expand All @@ -461,6 +464,20 @@ def _make_parquet_file(
df = pandas.DataFrame(
{f"col{x + 1}": np.arange(nrows) for x in range(ncols)}
)
index = pandas.RangeIndex(
start=range_index_start,
stop=range_index_start + (nrows * range_index_step),
step=range_index_step,
name=range_index_name,
)
if (
range_index_start == 0
and range_index_step == 1
and range_index_name is None
):
assert df.index.equals(index)
else:
df.index = index
if len(partitioned_columns) > 0:
df.to_parquet(
filename,
Expand Down
55 changes: 44 additions & 11 deletions modin/core/io/column_stores/parquet_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,24 +513,32 @@ def build_index(cls, dataset, partition_ids, index_columns, filters):
See `build_partition` for more detail on the contents of partitions_ids.
"""
range_index = True
range_index_metadata = None
column_names_to_read = []
for column in index_columns:
# According to https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html,
# only RangeIndex will be stored as metadata. Otherwise, the default behavior is
# to store the index as a column.
# https://pandas.pydata.org/docs/development/developer.html#storing-pandas-dataframe-objects-in-apache-parquet-format
# describes the format of the index column metadata.
# It is a list, where each entry is either a string or a dictionary.
# A string means that a column stored in the dataset is (part of) the index.
# A dictionary is metadata about a RangeIndex, which is metadata-only and not stored
# in the dataset as a column.
# There cannot be both for a single dataframe, because a MultiIndex can only contain
# "actual data" columns and not RangeIndex objects.
# See similar code in pyarrow: https://github.com/apache/arrow/blob/44811ba18477560711d512939535c8389dd7787b/python/pyarrow/pandas_compat.py#L912-L926
# and in fastparquet, here is where RangeIndex is handled: https://github.com/dask/fastparquet/blob/df1219300a96bc1baf9ebad85f4f5676a130c9e8/fastparquet/api.py#L809-L815
if isinstance(column, str):
column_names_to_read.append(column)
range_index = False
elif column["name"] is not None:
column_names_to_read.append(column["name"])
elif column["kind"] == "range":
range_index_metadata = column

# When the index has meaningful values, stored in a column, we will replicate those
# exactly in the Modin dataframe's index. This index may have repeated values, be unsorted,
# etc. This is all fine.
# A range index is the special case: we want the Modin dataframe to have a single range,
# not a range that keeps restarting. i.e. if the partitions have index 0-9, 0-19, 0-29,
# we want our Modin dataframe to have 0-59.
# When there are no filters, it is relatively trivial to construct the index by
# When there are no filters, it is relatively cheap to construct the index by
# actually reading in the necessary data, here in the main process.
# When there are filters, we let the workers materialize the indices before combining to
# get a single range.
Expand All @@ -552,12 +560,37 @@ def build_index(cls, dataset, partition_ids, index_columns, filters):
if range_index:
# There are filters, so we had to materialize in order to
# determine how many items there actually are
start = index_objs[0].start
total_length = sum(len(index_part) for index_part in index_objs)
complete_index = pandas.RangeIndex(
start=start,
stop=start + total_length,
total_filtered_length = sum(
len(index_part) for index_part in index_objs
)

metadata_length_mismatch = False
if range_index_metadata is not None:
metadata_implied_length = (
range_index_metadata["stop"] - range_index_metadata["start"]
) / range_index_metadata["step"]
metadata_length_mismatch = (
total_filtered_length != metadata_implied_length
)

# pyarrow ignores the RangeIndex metadata if it is not consistent with data length.
# https://github.com/apache/arrow/blob/44811ba18477560711d512939535c8389dd7787b/python/pyarrow/pandas_compat.py#L924-L926
# fastparquet keeps the start and step from the metadata and just adjusts to the length.
# https://github.com/dask/fastparquet/blob/df1219300a96bc1baf9ebad85f4f5676a130c9e8/fastparquet/api.py#L815
if range_index_metadata is None or (
isinstance(dataset, PyArrowDataset) and metadata_length_mismatch
):
complete_index = pandas.RangeIndex(total_filtered_length)
else:
complete_index = pandas.RangeIndex(
start=range_index_metadata["start"],
step=range_index_metadata["step"],
stop=(
range_index_metadata["start"]
+ (total_filtered_length * range_index_metadata["step"])
),
name=range_index_metadata["name"],
)
else:
complete_index = index_objs[0].append(index_objs[1:])
return complete_index, range_index or (len(index_columns) == 0)
Expand Down
Loading

0 comments on commit 8813981

Please sign in to comment.