Skip to content

Commit

Permalink
Fix index handling in parquet reader and writer(#6771)
Browse files Browse the repository at this point in the history
Fixes: #6337 , #6740 

The actual issue was that cudf was failing to read/write a dataframe for any case other than a RangeIndex/named Multi-Index(all-levels), so this PR revamps index writing and retrieval logic to and from parquet metadata.

This PR:

- [x] Introduces code changes to write & read Index/MultiIndex objects correctly into & from parquet files.
- [x] Adds `step` attribute to `RangeIndex`.
- [x] Fixes issue where a `RangeIndex` with `step` value other than 1 was failing in `cudf.from_pandas`.
- [x] Add & Fix pytests for above changes.

Authors:
  - galipremsagar <[email protected]>
  - GALI PREM SAGAR <[email protected]>

Approvers:
  - Keith Kraus
  - Devavret Makkar
  - Keith Kraus

URL: #6771
  • Loading branch information
galipremsagar authored Dec 2, 2020
1 parent 42644cc commit edd1af1
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 90 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
- PR #6742 Fix concat bug in dask_cudf Series/Index creation
- PR #6632 Fix DataFrame initialization from list of dicts
- PR #6767 Fix sort order of parameters in `test_scalar_invalid_implicit_conversion` pytest
- PR #6771 Fix index handling in parquet reader and writer
- PR #6787 Update java reduction APIs to reflect C++ changes
- PR #6790 Fix result representation in groupby.apply
- PR #6794 Fix AVRO reader issues with empty input
Expand Down
182 changes: 129 additions & 53 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import cudf
import errno
import os
import pyarrow as pa
from collections import OrderedDict

try:
import ujson as json
Expand Down Expand Up @@ -119,37 +120,34 @@ cpdef generate_pandas_metadata(Table table, index):

# Indexes
if index is not False:
for name in table._index.names:
if name is not None:
if isinstance(table._index, cudf.core.multiindex.MultiIndex):
idx = table.index.get_level_values(name)
else:
idx = table.index

if isinstance(idx, cudf.core.index.RangeIndex):
descr = {
"kind": "range",
"name": table.index.name,
"start": table.index._start,
"stop": table.index._stop,
"step": 1,
}
else:
descr = name
col_names.append(name)
if is_categorical_dtype(idx):
raise ValueError(
"'category' column dtypes are currently not "
+ "supported by the gpu accelerated parquet writer"
)
elif is_list_dtype(col):
types.append(col.dtype.to_arrow())
else:
types.append(np_to_pa_dtype(idx.dtype))
index_levels.append(idx)
index_descriptors.append(descr)
for level, name in enumerate(table._index.names):
if isinstance(table._index, cudf.core.multiindex.MultiIndex):
idx = table.index.get_level_values(level)
else:
col_names.append(name)
idx = table.index

if isinstance(idx, cudf.core.index.RangeIndex):
descr = {
"kind": "range",
"name": table.index.name,
"start": table.index.start,
"stop": table.index.stop,
"step": table.index.step,
}
else:
descr = _index_level_name(idx.name, level, col_names)
if is_categorical_dtype(idx):
raise ValueError(
"'category' column dtypes are currently not "
+ "supported by the gpu accelerated parquet writer"
)
elif is_list_dtype(idx):
types.append(col.dtype.to_arrow())
else:
types.append(np_to_pa_dtype(idx.dtype))
index_levels.append(idx)
col_names.append(name)
index_descriptors.append(descr)

metadata = pa.pandas_compat.construct_metadata(
table,
Expand Down Expand Up @@ -225,15 +223,24 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
column_names = [x.decode() for x in c_out_table.metadata.column_names]

# Access the Parquet user_data json to find the index
index_col = ''
index_col = None
cdef map[string, string] user_data = c_out_table.metadata.user_data
json_str = user_data[b'pandas'].decode('utf-8')
meta = None
if json_str != "":
meta = json.loads(json_str)
if 'index_columns' in meta and len(meta['index_columns']) > 0:
index_col = meta['index_columns'][0]

index_col = meta['index_columns']
if isinstance(index_col[0], dict) and \
index_col[0]['kind'] == 'range':
is_range_index = True
else:
is_range_index = False
index_col_names = OrderedDict()
for idx_col in index_col:
for c in meta['columns']:
if c['field_name'] == idx_col:
index_col_names[idx_col] = c['name']
df = cudf.DataFrame._from_table(
Table.from_unique_ptr(
move(c_out_table.tbl),
Expand All @@ -250,7 +257,7 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,

if not column_names:
column_names = [o['name'] for o in meta['columns']]
if index_col in cols_dtype_map:
if not is_range_index and index_col in cols_dtype_map:
column_names.remove(index_col)

for col in column_names:
Expand All @@ -261,16 +268,66 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
)

# Set the index column
if index_col is not '' and isinstance(index_col, str):
if index_col in column_names:
df = df.set_index(index_col)
new_index_name = pa.pandas_compat._backwards_compatible_index_name(
df.index.name, df.index.name
)
df.index.name = new_index_name
if index_col is not None and len(index_col) > 0:
if is_range_index:
range_index_meta = index_col[0]
if row_groups is not None:
per_file_metadata = [
pa.parquet.read_metadata(s) for s in filepaths_or_buffers
]

filtered_idx = []
for i, file_meta in enumerate(per_file_metadata):
row_groups_i = []
start = 0
for row_group in range(file_meta.num_row_groups):
stop = start + file_meta.row_group(row_group).num_rows
row_groups_i.append((start, stop))
start = stop

for rg in row_groups[i]:
filtered_idx.append(
cudf.RangeIndex(
start=row_groups_i[rg][0],
stop=row_groups_i[rg][1],
step=range_index_meta['step']
)
)

if len(filtered_idx) > 0:
idx = cudf.concat(filtered_idx)
else:
idx = cudf.Index(cudf.core.column.column_empty(0))
else:
idx = cudf.RangeIndex(
start=range_index_meta['start'],
stop=range_index_meta['stop'],
step=range_index_meta['step'],
name=range_index_meta['name']
)
if skiprows is not None:
idx = idx[skiprows:]
if num_rows is not None:
idx = idx[:num_rows]
df.index = idx
elif set(index_col).issubset(column_names):
index_data = df[index_col]
actual_index_names = list(index_col_names.values())
if len(index_data._data) == 1:
idx = cudf.Index(
index_data._data.columns[0],
name=actual_index_names[0]
)
else:
idx = cudf.MultiIndex.from_frame(
index_data,
names=actual_index_names
)
df.drop(columns=index_col, inplace=True)
df.index = idx
else:
if use_pandas_metadata:
df.index.name = index_col
df.index.names = index_col

return df

Expand All @@ -296,21 +353,20 @@ cpdef write_parquet(

cdef vector[string] column_names
cdef map[string, string] user_data
cdef table_view tv = table.data_view()
cdef table_view tv
cdef unique_ptr[cudf_io_types.data_sink] _data_sink
cdef cudf_io_types.sink_info sink = make_sink_info(path, _data_sink)

if index is not False:
if index is not False and not isinstance(table._index, cudf.RangeIndex):
tv = table.view()
if isinstance(table._index, cudf.core.multiindex.MultiIndex):
for idx_name in table._index.names:
column_names.push_back(str.encode(idx_name))
else:
if table._index.name is not None:
column_names.push_back(str.encode(table._index.name))
else:
# No named index exists so just write out columns
tv = table.data_view()
for level, idx_name in enumerate(table._index.names):
column_names.push_back(
str.encode(
_index_level_name(idx_name, level, table._column_names)
)
)
else:
tv = table.data_view()

for col_name in table._column_names:
column_names.push_back(str.encode(col_name))
Expand Down Expand Up @@ -544,3 +600,23 @@ cdef Column _update_column_struct_field_names(
)
col.set_base_children(tuple(children))
return col


def _index_level_name(index_name, level, column_names):
"""
Return the name of an index level or a default name
if `index_name` is None or is already a column name.
Parameters
----------
index_name : name of an Index object
level : level of the Index object
Returns
-------
name : str
"""
if index_name is not None and index_name not in column_names:
return index_name
else:
return f"__index_level_{level}__"
9 changes: 2 additions & 7 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4940,10 +4940,7 @@ def from_pandas(cls, dataframe, nan_as_null=None):
df.columns = dataframe.columns

# Set index
if isinstance(dataframe.index, pd.MultiIndex):
index = cudf.from_pandas(dataframe.index, nan_as_null=nan_as_null)
else:
index = dataframe.index
index = cudf.from_pandas(dataframe.index, nan_as_null=nan_as_null)
result = df.set_index(index)

return result
Expand Down Expand Up @@ -7137,10 +7134,8 @@ def from_pandas(obj, nan_as_null=None):
elif isinstance(obj, pd.MultiIndex):
return cudf.MultiIndex.from_pandas(obj, nan_as_null=nan_as_null)
elif isinstance(obj, pd.RangeIndex):
if obj._step and obj._step != 1:
raise ValueError("cudf RangeIndex requires step == 1")
return cudf.core.index.RangeIndex(
obj._start, stop=obj._stop, name=obj.name
start=obj.start, stop=obj.stop, step=obj.step, name=obj.name
)
elif isinstance(obj, pd.Index):
return cudf.Index.from_pandas(obj, nan_as_null=nan_as_null)
Expand Down
7 changes: 7 additions & 0 deletions python/cudf/cudf/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,13 @@ def stop(self):
"""
return self._stop

@property
def step(self):
"""
The value of the step parameter.
"""
return self._step

@property
def _num_columns(self):
return 1
Expand Down
10 changes: 5 additions & 5 deletions python/cudf/cudf/tests/test_pandas_interop.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
# Copyright (c) 2018, NVIDIA CORPORATION.
# Copyright (c) 2018-2020, NVIDIA CORPORATION.

import numpy as np
import pandas as pd
import pytest

import cudf
from cudf.core import DataFrame
Expand Down Expand Up @@ -85,6 +84,7 @@ def test_from_pandas_rangeindex():


def test_from_pandas_rangeindex_step():
idx1 = pd.RangeIndex(start=0, stop=8, step=2, name="myindex")
with pytest.raises(ValueError):
cudf.from_pandas(idx1)
expected = pd.RangeIndex(start=0, stop=8, step=2, name="myindex")
actual = cudf.from_pandas(expected)

assert_eq(expected, actual)
Loading

0 comments on commit edd1af1

Please sign in to comment.