Skip to content

Commit

Permalink
Fix columns & index handling in dataframe constructor(#6838)
Browse files Browse the repository at this point in the history
Fixes: #6821 

This PR fixes issue where `columns` and `index` are currently not being handled correctly in specific scenarios.

Authors:
  - galipremsagar <[email protected]>
  - GALI PREM SAGAR <[email protected]>

Approvers:
  - Richard (Rick) Zamora
  - Ashwin Srinath

URL: #6838
  • Loading branch information
galipremsagar authored Dec 8, 2020
1 parent 917759b commit f6b16ab
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 59 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

## Improvements

- PR #6838 Fix `columns` & `index` handling in dataframe constructor

## Bug Fixes

- PR #6912 Fix rmm_mode=managed parameter for gtests
Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/_lib/column.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ cdef class Column:
self._size = other_col.size
self._dtype = other_col._dtype
self.set_base_data(other_col.base_data)
self.set_base_mask(other_col.base_mask)
self.set_base_children(other_col.base_children)
self.set_base_mask(other_col.base_mask)
else:
return other_col

Expand Down
94 changes: 40 additions & 54 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,34 +200,52 @@ def __init__(self, data=None, index=None, columns=None, dtype=None):
"""
super().__init__()

if isinstance(columns, (Series, cudf.Index)):
columns = columns.to_pandas()

if isinstance(data, ColumnAccessor):
self._data = data
if index is None:
index = as_index(range(self._data.nrows))
self.index = as_index(index)
return None
index = as_index(range(data.nrows))
else:
index = as_index(index)
self._index = index

if isinstance(data, DataFrame):
self._data = data._data
self._index = data._index
self.columns = data.columns
return
if columns is not None:
self._data = data
self._reindex(columns=columns, deep=True, inplace=True)
else:
self._data = data

if isinstance(data, pd.DataFrame):
data = self.from_pandas(data)
self._data = data._data
self._index = data._index
self.columns = data.columns
return
elif isinstance(data, (DataFrame, pd.DataFrame)):
if isinstance(data, pd.DataFrame):
data = self.from_pandas(data)

if data is None:
if index is not None:
if not data.index.equals(index):
data = data.reindex(index)
index = data._index
else:
index = as_index(index)
else:
index = data._index

self._index = index

if columns is not None:
self._data = data._data
self._reindex(
columns=columns, index=index, deep=False, inplace=True
)
else:
self._data = data._data
self.columns = data.columns

elif data is None:
if index is None:
self._index = RangeIndex(0)
else:
self._index = as_index(index)
if columns is not None:
if isinstance(columns, (Series, cudf.Index)):
columns = columns.to_pandas()

self._data = ColumnAccessor(
OrderedDict.fromkeys(
Expand Down Expand Up @@ -2560,48 +2578,16 @@ def reindex(

df = self
cols = columns
original_cols = df._data
dtypes = OrderedDict(df.dtypes)
idx = labels if index is None and axis in (0, "index") else index
cols = labels if cols is None and axis in (1, "columns") else cols
df = df if cols is None else df[list(set(df.columns) & set(cols))]

if idx is not None:
idx = as_index(idx)

if isinstance(idx, cudf.core.MultiIndex):
idx_dtype_match = (
df.index._source_data.dtypes == idx._source_data.dtypes
).all()
else:
idx_dtype_match = df.index.dtype == idx.dtype

if not idx_dtype_match:
cols = cols if cols is not None else list(df.columns)
df = DataFrame()
else:
df = DataFrame(None, idx).join(df, how="left", sort=True)
# double-argsort to map back from sorted to unsorted positions
df = df.take(idx.argsort(ascending=True).argsort())

idx = idx if idx is not None else df.index
names = cols if cols is not None else list(df.columns)

length = len(idx)
cols = OrderedDict()

for name in names:
if name in df:
cols[name] = df._data[name].copy(deep=copy)
else:
dtype = dtypes.get(name, np.float64)
col = original_cols.get(name, Series(dtype=dtype)._column)
col = column.column_empty_like(
col, dtype=dtype, masked=True, newsize=length
)
cols[name] = col
result = df._reindex(
columns=cols, dtypes=dtypes, deep=copy, index=idx, inplace=False
)

return DataFrame(cols, idx)
return result

def _set_index(
self, index, to_drop=None, inplace=False, verify_integrity=False,
Expand Down
75 changes: 74 additions & 1 deletion python/cudf/cudf/core/frame.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
import copy
import functools
import operator
import warnings
from collections import OrderedDict, abc as abc
import operator

import cupy
import numpy as np
Expand Down Expand Up @@ -3301,6 +3301,79 @@ def _encode(self):
keys = self.__class__._from_table(keys)
return keys, indices

def _reindex(
self, columns, dtypes=None, deep=False, index=None, inplace=False
):
"""
Helper for `.reindex`
Parameters
----------
columns : array-like
The list of columns to select from the Frame,
if ``columns`` is a superset of ``Frame.columns`` new
columns are created.
dtypes : dict
Mapping of dtypes for the empty columns being created.
deep : boolean, optional, default False
Whether to make deep copy or shallow copy of the columns.
index : Index or array-like, default None
The ``index`` to be used to reindex the Frame with.
inplace : bool, default False
Whether to perform the operation in place on the data.
Returns
-------
DataFrame
"""
if dtypes is None:
dtypes = {}

df = self
if index is not None:
index = cudf.core.index.as_index(index)

if isinstance(index, cudf.core.MultiIndex):
idx_dtype_match = (
df.index._source_data.dtypes == index._source_data.dtypes
).all()
else:
idx_dtype_match = df.index.dtype == index.dtype

if not idx_dtype_match:
columns = columns if columns is not None else list(df.columns)
df = cudf.DataFrame()
else:
df = cudf.DataFrame(None, index).join(
df, how="left", sort=True
)
# double-argsort to map back from sorted to unsorted positions
df = df.take(index.argsort(ascending=True).argsort())

cols = OrderedDict()
index = index if index is not None else df.index
names = columns if columns is not None else list(df.columns)
for name in names:
if name in df._data:
cols[name] = df._data[name].copy(deep=deep)
else:
dtype = dtypes.get(name, np.float64)
cols[name] = column_empty(
dtype=dtype, masked=True, row_count=len(index)
)
result = self.__class__._from_table(
Frame(
data=cudf.core.column_accessor.ColumnAccessor(
cols,
multiindex=self._data.multiindex,
level_names=self._data.level_names,
)
),
index=index,
)

return self._mimic_inplace(result, inplace=inplace)


def _get_replacement_values(to_replace, replacement, col_name, column):

Expand Down
59 changes: 59 additions & 0 deletions python/cudf/cudf/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -8015,3 +8015,62 @@ def test_dataframe_from_pandas_duplicate_columns():
ValueError, match="Duplicate column names are not allowed"
):
gd.from_pandas(pdf)


@pytest.mark.parametrize(
"df",
[
pd.DataFrame(
{"a": [1, 2, 3], "b": [10, 11, 20], "c": ["a", "bcd", "xyz"]}
),
pd.DataFrame(),
],
)
@pytest.mark.parametrize(
"columns",
[
None,
["a"],
["c", "a"],
["b", "a", "c"],
[],
pd.Index(["c", "a"]),
gd.Index(["c", "a"]),
["abc", "a"],
["column_not_exists1", "column_not_exists2"],
],
)
@pytest.mark.parametrize("index", [["abc", "def", "ghi"]])
def test_dataframe_constructor_columns(df, columns, index):
def assert_local_eq(actual, df, expected, host_columns):
check_index_type = False if expected.empty else True
if host_columns is not None and any(
col not in df.columns for col in host_columns
):
assert_eq(
expected,
actual,
check_dtype=False,
check_index_type=check_index_type,
)
else:
assert_eq(expected, actual, check_index_type=check_index_type)

gdf = gd.from_pandas(df)
host_columns = (
columns.to_pandas() if isinstance(columns, gd.Index) else columns
)

expected = pd.DataFrame(df, columns=host_columns, index=index)
actual = gd.DataFrame(gdf, columns=columns, index=index)

assert_local_eq(actual, df, expected, host_columns)

expected = pd.DataFrame(df, columns=host_columns)
actual = gd.DataFrame(gdf._data, columns=columns, index=index)
if index is not None:
if df.shape == (0, 0):
expected = pd.DataFrame(columns=host_columns, index=index)
else:
expected.index = index
assert_local_eq(actual, df, expected, host_columns)
13 changes: 10 additions & 3 deletions python/dask_cudf/dask_cudf/sorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def _quantile(a, q):
n = len(a)
if not len(a):
return None, n
return (a.quantiles(q.tolist(), interpolation="nearest"), n)
return (a.quantiles(q=q.tolist(), interpolation="nearest"), n)


def merge_quantiles(finalq, qs, vals):
Expand Down Expand Up @@ -121,7 +121,7 @@ def _approximate_quantile(df, q):

# Define final action (create df with quantiles as index)
def finalize_tsk(tsk):
return (final_type, tsk, q)
return (final_type, tsk)

return_type = df.__class__

Expand Down Expand Up @@ -162,7 +162,14 @@ def finalize_tsk(tsk):
}
dsk = toolz.merge(val_dsk, merge_dsk)
graph = HighLevelGraph.from_collections(name2, dsk, dependencies=[df])
return return_type(graph, name2, meta, new_divisions)
df = return_type(graph, name2, meta, new_divisions)

def set_quantile_index(df):
df.index = q
return df

df = df.map_partitions(set_quantile_index, meta=meta)
return df


def quantile_divisions(df, by, npartitions):
Expand Down

0 comments on commit f6b16ab

Please sign in to comment.