Skip to content

Commit

Permalink
Clean up reshaping ops (#16553)
Browse files Browse the repository at this point in the history
Uses some more "idiomatic" cudf patterns such as 

* Checking `isinstance(column.dtype, ...)` instead of `isinstance(column, ...)` (to avoid importing the column objects)
* Using `DataFrame._from_data(dict)` instead of creating an empty `DataFrame` and adding columns one by one

Also avoids some column materialization in `DataFrame.columns = `:

* For `RangeIndex`, avoid materializing to a column to get a distinct count
* For `MultiIndex`, avoid creating a `cudf.MultiIndex` with columns as it's converted to a CPU object to get column labels for the `ColumnAccessor`

Authors:
  - Matthew Roeschke (https://github.com/mroeschke)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Bradley Dice (https://github.com/bdice)

URL: #16553
  • Loading branch information
mroeschke authored Aug 16, 2024
1 parent e16c2f2 commit 30011c5
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 67 deletions.
8 changes: 6 additions & 2 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2654,8 +2654,12 @@ def columns(self, columns):
elif isinstance(columns, (cudf.BaseIndex, ColumnBase, Series)):
level_names = (getattr(columns, "name", None),)
rangeindex = isinstance(columns, cudf.RangeIndex)
columns = as_column(columns)
if columns.distinct_count(dropna=False) != len(columns):
if rangeindex:
unique_count = len(columns)
else:
columns = as_column(columns)
unique_count = columns.distinct_count(dropna=False)
if unique_count != len(columns):
raise ValueError("Duplicate column names are not allowed")
pd_columns = pd.Index(columns.to_pandas())
label_dtype = pd_columns.dtype
Expand Down
141 changes: 76 additions & 65 deletions python/cudf/cudf/core/reshape.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import itertools
import warnings
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Literal

import numpy as np
import pandas as pd
Expand All @@ -14,7 +14,7 @@
from cudf.api.extensions import no_default
from cudf.core._compat import PANDAS_LT_300
from cudf.core.column import ColumnBase, as_column, column_empty_like
from cudf.core.column.categorical import CategoricalColumn
from cudf.core.column_accessor import ColumnAccessor
from cudf.utils.dtypes import min_unsigned_type

if TYPE_CHECKING:
Expand Down Expand Up @@ -101,7 +101,9 @@ def _get_combined_index(indexes, intersect: bool = False, sort=None):
return index


def _normalize_series_and_dataframe(objs, axis):
def _normalize_series_and_dataframe(
objs: list[cudf.Series | cudf.DataFrame], axis: Literal[0, 1]
) -> None:
"""Convert any cudf.Series objects in objs to DataFrames in place."""
# Default to naming series by a numerical id if they are not named.
sr_name = 0
Expand Down Expand Up @@ -335,7 +337,7 @@ def concat(
result = obj.to_frame()
else:
result = obj.copy(deep=True)
result.columns = pd.RangeIndex(len(result._data))
result.columns = cudf.RangeIndex(len(result._data))
else:
result = type(obj)._from_data(
data=obj._data.copy(deep=True),
Expand All @@ -350,7 +352,7 @@ def concat(
result = obj.copy(deep=True)
if keys_objs is not None and isinstance(result, cudf.DataFrame):
k = keys_objs[0]
result.columns = cudf.MultiIndex.from_tuples(
result.columns = pd.MultiIndex.from_tuples(
[
(k, *c) if isinstance(c, tuple) else (k, c)
for c in result._column_names
Expand All @@ -369,7 +371,6 @@ def concat(
raise TypeError(
"Can only concatenate Series and DataFrame objects when axis=1"
)
df = cudf.DataFrame()
_normalize_series_and_dataframe(objs, axis=axis)

any_empty = any(obj.empty for obj in objs)
Expand All @@ -393,18 +394,23 @@ def concat(
objs = [obj for obj in objs if obj.shape != (0, 0)]

if len(objs) == 0:
return df
# TODO: https://github.com/rapidsai/cudf/issues/16550
return cudf.DataFrame()

# Don't need to align indices of all `objs` since we
# would anyway return an empty dataframe below
if not empty_inner:
objs = _align_objs(objs, how=join, sort=sort)
df.index = objs[0].index
result_index = objs[0].index
else:
result_index = None

result_data = {}
result_columns = None
if keys_objs is None:
for o in objs:
for name, col in o._data.items():
if name in df._data:
if name in result_data:
raise NotImplementedError(
f"A Column with duplicate name found: {name}, cuDF "
f"doesn't support having multiple columns with "
Expand All @@ -414,11 +420,11 @@ def concat(
# if join is inner and it contains an empty df
# we return an empty df, hence creating an empty
# column with dtype metadata retained.
df[name] = cudf.core.column.column_empty_like(
result_data[name] = cudf.core.column.column_empty_like(
col, newsize=0
)
else:
df[name] = col
result_data[name] = col

result_columns = (
objs[0]
Expand Down Expand Up @@ -451,21 +457,21 @@ def concat(
else:
col_label = (k, name)
if empty_inner:
df[col_label] = cudf.core.column.column_empty_like(
col, newsize=0
result_data[col_label] = (
cudf.core.column.column_empty_like(col, newsize=0)
)
else:
df[col_label] = col
result_data[col_label] = col

if keys_objs is None:
df.columns = result_columns.unique()
if ignore_index:
df.columns = cudf.RangeIndex(len(result_columns.unique()))
elif ignore_index:
# with ignore_index the column names change to numbers
df.columns = cudf.RangeIndex(len(result_columns))
df = cudf.DataFrame._from_data(
ColumnAccessor(result_data, verify=False), index=result_index
)
if ignore_index:
df.columns = cudf.RangeIndex(df._num_columns)
elif result_columns is not None:
df.columns = result_columns
elif not only_series:
df.columns = cudf.MultiIndex.from_tuples(df._column_names)
df.columns = pd.MultiIndex.from_tuples(df._column_names)

if empty_inner:
# if join is inner and it contains an empty df
Expand All @@ -486,6 +492,7 @@ def concat(
if len(objs) == 0:
# If objs is empty, that indicates all of
# objs are empty dataframes.
# TODO: https://github.com/rapidsai/cudf/issues/16550
return cudf.DataFrame()
elif len(objs) == 1:
obj = objs[0]
Expand Down Expand Up @@ -519,7 +526,7 @@ def concat(
elif typ is cudf.MultiIndex:
return cudf.MultiIndex._concat(objs)
elif issubclass(typ, cudf.Index):
return cudf.core.index.Index._concat(objs)
return cudf.Index._concat(objs)
else:
raise TypeError(f"cannot concatenate object of type {typ}")

Expand Down Expand Up @@ -632,18 +639,19 @@ def melt(
value_vars = [c for c in frame._column_names if c not in unique_id]

# Error for unimplemented support for datatype
dtypes = [frame[col].dtype for col in id_vars + value_vars]
if any(isinstance(typ, cudf.CategoricalDtype) for typ in dtypes):
if any(
isinstance(frame[col].dtype, cudf.CategoricalDtype)
for col in id_vars + value_vars
):
raise NotImplementedError(
"Categorical columns are not yet supported for function"
)

# Check dtype homogeneity in value_var
# Because heterogeneous concat is unimplemented
dtypes = [frame[col].dtype for col in value_vars]
if len(dtypes) > 0:
dtype = dtypes[0]
if any(t != dtype for t in dtypes):
if len(value_vars) > 1:
dtype = frame[value_vars[0]].dtype
if any(frame[col].dtype != dtype for col in value_vars):
raise ValueError("all cols in value_vars must have the same dtype")

# overlap
Expand Down Expand Up @@ -969,37 +977,39 @@ def _pivot(df, index, columns):
index_labels, index_idx = index._encode()
column_labels = columns_labels.to_pandas().to_flat_index()

def as_tuple(x):
return x if isinstance(x, tuple) else (x,)

result = {}
for v in df:
names = [as_tuple(v) + as_tuple(name) for name in column_labels]
if len(index_labels) != 0 and len(columns_labels) != 0:

def as_tuple(x):
return x if isinstance(x, tuple) else (x,)

nrows = len(index_labels)
ncols = len(names)
num_elements = nrows * ncols
if num_elements > 0:
col = df._data[v]
for col_label, col in df._data.items():
names = [
as_tuple(col_label) + as_tuple(name) for name in column_labels
]
new_size = nrows * len(names)
scatter_map = (columns_idx * np.int32(nrows)) + index_idx
target = cudf.DataFrame._from_data(
{
None: cudf.core.column.column_empty_like(
col, masked=True, newsize=nrows * ncols
)
}
target_col = cudf.core.column.column_empty_like(
col, masked=True, newsize=new_size
)
target._data[None][scatter_map] = col
result_frames = target._split(range(nrows, nrows * ncols, nrows))
target_col[scatter_map] = col
target = cudf.Index._from_column(target_col)
result.update(
{
name: next(iter(f._columns))
for name, f in zip(names, result_frames)
name: idx._column
for name, idx in zip(
names, target._split(range(nrows, new_size, nrows))
)
}
)

# the result of pivot always has a multicolumn
ca = cudf.core.column_accessor.ColumnAccessor(
result, multiindex=True, level_names=(None,) + columns._data.names
ca = ColumnAccessor(
result,
multiindex=True,
level_names=(None,) + columns._data.names,
verify=False,
)
return cudf.DataFrame._from_data(
ca, index=cudf.Index(index_labels, name=index.name)
Expand Down Expand Up @@ -1070,19 +1080,20 @@ def pivot(data, columns=None, index=no_default, values=no_default):
if index is no_default:
index = df.index
else:
index = cudf.core.index.Index(df.loc[:, index])
index = cudf.Index(df.loc[:, index])
columns = cudf.Index(df.loc[:, columns])

# Create a DataFrame composed of columns from both
# columns and index
columns_index = {}
columns_index = {
i: col
for i, col in enumerate(
itertools.chain(index._data.columns, columns._data.columns)
)
}
columns_index = cudf.DataFrame(columns_index)
ca = ColumnAccessor(
dict(
enumerate(
itertools.chain(index._data.columns, columns._data.columns)
)
),
verify=False,
)
columns_index = cudf.DataFrame._from_data(ca)

# Check that each row is unique:
if len(columns_index) != len(columns_index.drop_duplicates()):
Expand Down Expand Up @@ -1225,13 +1236,13 @@ def unstack(df, level, fill_value=None, sort: bool = True):
return result


def _get_unique(column, dummy_na):
def _get_unique(column: ColumnBase, dummy_na: bool) -> ColumnBase:
"""
Returns unique values in a column, if
dummy_na is False, nan's are also dropped.
"""
if isinstance(column, cudf.core.column.CategoricalColumn):
unique = column.categories
if isinstance(column.dtype, cudf.CategoricalDtype):
unique = column.categories # type: ignore[attr-defined]
else:
unique = column.unique().sort_values()
if not dummy_na:
Expand All @@ -1251,11 +1262,11 @@ def _one_hot_encode_column(
`prefix`, separated with category name with `prefix_sep`. The encoding
columns maybe coerced into `dtype`.
"""
if isinstance(column, CategoricalColumn):
if isinstance(column.dtype, cudf.CategoricalDtype):
if column.size == column.null_count:
column = column_empty_like(categories, newsize=column.size)
else:
column = column._get_decategorized_column()
column = column._get_decategorized_column() # type: ignore[attr-defined]

if column.size * categories.size >= np.iinfo(size_type_dtype).max:
raise ValueError(
Expand Down Expand Up @@ -1536,7 +1547,7 @@ def pivot_table(
table_columns = tuple(
map(lambda column: column[1:], table._data.names)
)
table.columns = cudf.MultiIndex.from_tuples(
table.columns = pd.MultiIndex.from_tuples(
tuples=table_columns, names=column_names
)

Expand Down

0 comments on commit 30011c5

Please sign in to comment.