Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate .cov and .corr for sort groupby #10386

Merged
merged 14 commits into from
Mar 12, 2022
Merged
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 29 additions & 70 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,63 +942,12 @@ def corr(self, method="pearson", min_periods=1):
"Only pearson correlation is currently supported"
)

# create expanded dataframe consisting all combinations of the
# struct columns-pairs to be correlated
# i.e (('col1', 'col1'), ('col1', 'col2'), ('col2', 'col2'))
_cols = self.grouping.values._data.to_pandas_index().tolist()
len_cols = len(_cols)

new_df_data = {}
for x, y in itertools.combinations_with_replacement(_cols, 2):
new_df_data[(x, y)] = cudf.DataFrame._from_data(
{"x": self.obj._data[x], "y": self.obj._data[y]}
).to_struct()
new_gb = cudf.DataFrame._from_data(new_df_data).groupby(
by=self.grouping.keys
)

try:
gb_corr = new_gb.agg(lambda x: x.corr(method, min_periods))
except RuntimeError as e:
if "Unsupported groupby reduction type-agg combination" in str(e):
raise TypeError(
"Correlation accepts only numerical column-pairs"
)
raise

# ensure that column-pair labels are arranged in ascending order
cols_list = [
(y, x) if i > j else (x, y)
for j, y in enumerate(_cols)
for i, x in enumerate(_cols)
]
cols_split = [
cols_list[i : i + len_cols]
for i in range(0, len(cols_list), len_cols)
]

# interleave: combine the correlation results for each column-pair
# into a single column
res = cudf.DataFrame._from_data(
{
x: gb_corr.loc[:, i].interleave_columns()
for i, x in zip(cols_split, _cols)
}
)

# create a multiindex for the groupby correlated dataframe,
# to match pandas behavior
unsorted_idx = gb_corr.index.repeat(len_cols)
idx_sort_order = unsorted_idx._get_sorted_inds()
sorted_idx = unsorted_idx._gather(idx_sort_order)
if len(gb_corr):
# TO-DO: Should the operation below be done on the CPU instead?
sorted_idx._data[None] = as_column(
cudf.Series(_cols).tile(len(gb_corr.index))
def func(column_pair_groupby):
skirui-source marked this conversation as resolved.
Show resolved Hide resolved
return column_pair_groupby.agg(
lambda x: x.corr(method, min_periods)
)
res.index = MultiIndex._from_data(sorted_idx._data)

return res
return self._cov_or_corr(func, "Correlation")
skirui-source marked this conversation as resolved.
Show resolved Hide resolved

def cov(self, min_periods=0, ddof=1):
"""
Expand Down Expand Up @@ -1085,8 +1034,20 @@ def cov(self, min_periods=0, ddof=1):
val3 3.833333 12.333333 12.333333
"""

def func(column_pair_groupby):
skirui-source marked this conversation as resolved.
Show resolved Hide resolved
return column_pair_groupby.agg(lambda x: x.cov(min_periods, ddof))

return self._cov_or_corr(func, "Covariance")

def _cov_or_corr(self, func, method_name):
"""
internal function that is called by either corr() or cov()
for sort groupby correlation and covariance computations,
respectively.

skirui-source marked this conversation as resolved.
Show resolved Hide resolved
"""
# create expanded dataframe consisting all combinations of the
# struct columns-pairs used in the covariance calculation
# struct columns-pairs to be used in the correlation or covariance
# i.e. (('col1', 'col1'), ('col1', 'col2'), ('col2', 'col2'))
column_names = self.grouping.values._column_names
num_cols = len(column_names)
Expand Down Expand Up @@ -1122,13 +1083,11 @@ def cov(self, min_periods=0, ddof=1):
).groupby(by=self.grouping.keys)

try:
gb_cov = column_pair_groupby.agg(
lambda x: x.cov(min_periods, ddof)
)
gb_cov_corr = func(column_pair_groupby)
except RuntimeError as e:
if "Unsupported groupby reduction type-agg combination" in str(e):
raise TypeError(
"Covariance accepts only numerical column-pairs"
f"{method_name} accepts only numerical column-pairs"
)
raise

Expand All @@ -1143,29 +1102,29 @@ def cov(self, min_periods=0, ddof=1):
for i in range(0, len(cols_list), num_cols)
]

def combine_columns(gb_cov, ys):
list_of_columns = [gb_cov._data[y] for y in ys]
def combine_columns(gb_cov_corr, ys):
list_of_columns = [gb_cov_corr._data[y] for y in ys]
frame = cudf.core.frame.Frame._from_columns(list_of_columns, ys)
return interleave_columns(frame)

# interleave: combine the correlation results for each column-pair
# into a single column
# interleave: combine the correlation or covariance results for each
# column-pair into a single column
res = cudf.DataFrame._from_data(
{
x: combine_columns(gb_cov, ys)
x: combine_columns(gb_cov_corr, ys)
for ys, x in zip(cols_split, column_names)
}
)

# create a multiindex for the groupby correlated dataframe,
# to match pandas behavior
unsorted_idx = gb_cov.index.repeat(num_cols)
# create a multiindex for the groupby covariance or correlation
# dataframe, to match pandas behavior
unsorted_idx = gb_cov_corr.index.repeat(num_cols)
idx_sort_order = unsorted_idx._get_sorted_inds()
sorted_idx = unsorted_idx._gather(idx_sort_order)
if len(gb_cov):
if len(gb_cov_corr):
# TO-DO: Should the operation below be done on the CPU instead?
sorted_idx._data[None] = as_column(
np.tile(column_names, len(gb_cov.index))
np.tile(column_names, len(gb_cov_corr.index))
)
res.index = MultiIndex._from_data(sorted_idx._data)

Expand Down