Skip to content

Commit

Permalink
Fix arrow-based round trip of empty dataframes (#15373)
Browse files Browse the repository at this point in the history
When materializing range indices we were not previously creating the correct metadata. So do that.

While here, tidy up a few corner cases around creating range indices when constructing empty data frames.

- Closes #12243 
- Closes #14159

Authors:
  - Lawrence Mitchell (https://github.com/wence-)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #15373
  • Loading branch information
wence- authored Mar 23, 2024
1 parent b29fc1d commit dda3f31
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 41 deletions.
40 changes: 22 additions & 18 deletions python/cudf/cudf/_lib/utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ cpdef generate_pandas_metadata(table, index):
types = []
index_levels = []
index_descriptors = []

columns_to_convert = list(table._columns)
# Columns
for name, col in table._data.items():
if cudf.get_option("mode.pandas_compatible"):
Expand Down Expand Up @@ -90,6 +90,7 @@ cpdef generate_pandas_metadata(table, index):
types.append(np_to_pa_dtype(col.dtype))

# Indexes
materialize_index = False
if index is not False:
for level, name in enumerate(table._index.names):
if isinstance(table._index, cudf.core.multiindex.MultiIndex):
Expand All @@ -107,22 +108,26 @@ cpdef generate_pandas_metadata(table, index):
"step": table.index.step,
}
else:
materialize_index = True
# When `index=True`, RangeIndex needs to be materialized.
materialized_idx = cudf.Index(idx._values, name=idx.name)
descr = \
_index_level_name(
index_name=materialized_idx.name,
level=level,
column_names=col_names
)
index_levels.append(materialized_idx)
else:
descr = \
_index_level_name(
index_name=idx.name,
descr = _index_level_name(
index_name=materialized_idx.name,
level=level,
column_names=col_names
)
index_levels.append(materialized_idx)
columns_to_convert.append(materialized_idx._values)
col_names.append(descr)
types.append(np_to_pa_dtype(materialized_idx.dtype))
else:
descr = _index_level_name(
index_name=idx.name,
level=level,
column_names=col_names
)
columns_to_convert.append(idx._values)
col_names.append(descr)
if isinstance(idx.dtype, cudf.CategoricalDtype):
raise ValueError(
"'category' column dtypes are currently not "
Expand All @@ -141,17 +146,16 @@ cpdef generate_pandas_metadata(table, index):
types.append(np_to_pa_dtype(idx.dtype))

index_levels.append(idx)
col_names.append(name)
index_descriptors.append(descr)

df_meta = table.head(0)
if materialize_index:
df_meta.index = df_meta.index._as_int_index()
metadata = pa.pandas_compat.construct_metadata(
columns_to_convert=[
col
for col in table._columns
],
columns_to_convert=columns_to_convert,
# It is OKAY to do `.head(0).to_pandas()` because
# this method will extract `.columns` metadata only
df=table.head(0).to_pandas(),
df=df_meta.to_pandas(),
column_names=col_names,
index_levels=index_levels,
index_descriptors=index_descriptors,
Expand Down
43 changes: 24 additions & 19 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5485,14 +5485,18 @@ def from_arrow(cls, table):
return out

@_cudf_nvtx_annotate
def to_arrow(self, preserve_index=True):
def to_arrow(self, preserve_index=None):
"""
Convert to a PyArrow Table.
Parameters
----------
preserve_index : bool, default True
whether index column and its meta data needs to be saved or not
preserve_index : bool, optional
whether index column and its meta data needs to be saved
or not. The default of None will store the index as a
column, except for a RangeIndex which is stored as
metadata only. Setting preserve_index to True will force
a RangeIndex to be materialized.
Returns
-------
Expand Down Expand Up @@ -5523,34 +5527,35 @@ def to_arrow(self, preserve_index=True):

data = self.copy(deep=False)
index_descr = []
if preserve_index:
if isinstance(self.index, cudf.RangeIndex):
write_index = preserve_index is not False
keep_range_index = write_index and preserve_index is None
index = self.index
if write_index:
if isinstance(index, cudf.RangeIndex) and keep_range_index:
descr = {
"kind": "range",
"name": self.index.name,
"start": self.index._start,
"stop": self.index._stop,
"name": index.name,
"start": index._start,
"stop": index._stop,
"step": 1,
}
else:
if isinstance(self.index, MultiIndex):
if isinstance(index, cudf.RangeIndex):
index = index._as_int_index()
index.name = "__index_level_0__"
if isinstance(index, MultiIndex):
gen_names = tuple(
f"level_{i}"
for i, _ in enumerate(self.index._data.names)
f"level_{i}" for i, _ in enumerate(index._data.names)
)
else:
gen_names = (
self.index.names
if self.index.name is not None
else ("index",)
index.names if index.name is not None else ("index",)
)
for gen_name, col_name in zip(
gen_names, self.index._data.names
):
for gen_name, col_name in zip(gen_names, index._data.names):
data._insert(
data.shape[1],
gen_name,
self.index._data[col_name],
index._data[col_name],
)
descr = gen_names[0]
index_descr.append(descr)
Expand All @@ -5560,7 +5565,7 @@ def to_arrow(self, preserve_index=True):
columns_to_convert=[self[col] for col in self._data.names],
df=self,
column_names=out.schema.names,
index_levels=[self.index],
index_levels=[index],
index_descriptors=index_descr,
preserve_index=preserve_index,
types=out.schema.types,
Expand Down
59 changes: 58 additions & 1 deletion python/cudf/cudf/tests/dataframe/test_io_serialization.py
Original file line number Diff line number Diff line change
@@ -1 +1,58 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
import contextlib
from io import BytesIO

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pytest

import cudf
from cudf.testing._utils import assert_eq


@pytest.mark.parametrize(
"index",
[range(1, 11), list(range(1, 11)), range(1, 11)[::2]],
ids=["RangeIndex", "IntIndex", "StridedRange"],
)
@pytest.mark.parametrize("write_index", [False, True, None])
@pytest.mark.parametrize("empty", [False, True], ids=["nonempty", "empty"])
def test_dataframe_parquet_roundtrip(index, write_index, empty):
if empty:
data = {}
else:
data = {"a": [i * 2 for i in index]}
df = cudf.DataFrame(data=data, index=index)
pf = pd.DataFrame(data=data, index=index)
gpu_buf = BytesIO()
cpu_buf = BytesIO()

df.to_parquet(gpu_buf, index=write_index)
pf.to_parquet(cpu_buf, index=write_index)
gpu_table = pq.read_table(gpu_buf)
cpu_table = pq.read_table(cpu_buf)
metadata_equal = (
gpu_table.schema.pandas_metadata == cpu_table.schema.pandas_metadata
)
if empty and write_index is not False:
# https://github.com/rapidsai/cudf/issues/15372
ctx = pytest.raises(AssertionError)
else:
ctx = contextlib.nullcontext()
with ctx:
assert metadata_equal

gpu_read = cudf.read_parquet(gpu_buf)
cpu_read = cudf.read_parquet(cpu_buf)
with ctx:
assert_eq(gpu_read, cpu_read)


@pytest.mark.parametrize("preserve_index", [False, True, None])
def test_dataframe_to_arrow_preserve_index(preserve_index):
df = cudf.DataFrame({"x": ["cat", "dog"] * 5})
pf = df.to_pandas()
expect = pa.Table.from_pandas(pf, preserve_index=preserve_index).schema
got = df.to_arrow(preserve_index=preserve_index).schema
assert expect == got
14 changes: 11 additions & 3 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2442,9 +2442,17 @@ def test_parquet_index(pdf, index):
run_parquet_index(pdf, index)


@pytest.mark.parametrize("index", [None, True])
@pytest.mark.xfail(
reason="https://github.com/rapidsai/cudf/issues/12243",
@pytest.mark.parametrize(
"index",
[
pytest.param(
None,
marks=pytest.mark.xfail(
reason="https://github.com/apache/arrow/issues/40743"
),
),
True,
],
)
def test_parquet_index_empty(index):
pdf = pd.DataFrame(index=pd.RangeIndex(0, 10, 1))
Expand Down

0 comments on commit dda3f31

Please sign in to comment.