Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate .cov and .corr for sort groupby #10386

Merged
merged 14 commits into from
Mar 12, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 25 additions & 71 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,64 +942,10 @@ def corr(self, method="pearson", min_periods=1):
"Only pearson correlation is currently supported"
)

# create expanded dataframe consisting all combinations of the
# struct columns-pairs to be correlated
# i.e (('col1', 'col1'), ('col1', 'col2'), ('col2', 'col2'))
_cols = self.grouping.values._data.to_pandas_index().tolist()
len_cols = len(_cols)

new_df_data = {}
for x, y in itertools.combinations_with_replacement(_cols, 2):
new_df_data[(x, y)] = cudf.DataFrame._from_data(
{"x": self.obj._data[x], "y": self.obj._data[y]}
).to_struct()
new_gb = cudf.DataFrame._from_data(new_df_data).groupby(
by=self.grouping.keys
)

try:
gb_corr = new_gb.agg(lambda x: x.corr(method, min_periods))
except RuntimeError as e:
if "Unsupported groupby reduction type-agg combination" in str(e):
raise TypeError(
"Correlation accepts only numerical column-pairs"
)
raise

# ensure that column-pair labels are arranged in ascending order
cols_list = [
(y, x) if i > j else (x, y)
for j, y in enumerate(_cols)
for i, x in enumerate(_cols)
]
cols_split = [
cols_list[i : i + len_cols]
for i in range(0, len(cols_list), len_cols)
]

# interleave: combine the correlation results for each column-pair
# into a single column
res = cudf.DataFrame._from_data(
{
x: gb_corr.loc[:, i].interleave_columns()
for i, x in zip(cols_split, _cols)
}
return self._cov_or_corr(
lambda x: x.corr(method, min_periods), "Correlation"
)

# create a multiindex for the groupby correlated dataframe,
# to match pandas behavior
unsorted_idx = gb_corr.index.repeat(len_cols)
idx_sort_order = unsorted_idx._get_sorted_inds()
sorted_idx = unsorted_idx._gather(idx_sort_order)
if len(gb_corr):
# TO-DO: Should the operation below be done on the CPU instead?
sorted_idx._data[None] = as_column(
cudf.Series(_cols).tile(len(gb_corr.index))
)
res.index = MultiIndex._from_data(sorted_idx._data)

return res

def cov(self, min_periods=0, ddof=1):
"""
Compute the pairwise covariance among the columns of a DataFrame,
Expand Down Expand Up @@ -1085,8 +1031,18 @@ def cov(self, min_periods=0, ddof=1):
val3 3.833333 12.333333 12.333333
"""

return self._cov_or_corr(
lambda x: x.cov(min_periods, ddof), "Covariance"
)

def _cov_or_corr(self, func, method_name):
"""
internal function that is called by either corr() or cov()
for sort groupby correlation and covariance computations,
respectively.
"""
# create expanded dataframe consisting all combinations of the
# struct columns-pairs used in the covariance calculation
# struct columns-pairs to be used in the correlation or covariance
# i.e. (('col1', 'col1'), ('col1', 'col2'), ('col2', 'col2'))
column_names = self.grouping.values._column_names
num_cols = len(column_names)
Expand Down Expand Up @@ -1122,13 +1078,11 @@ def cov(self, min_periods=0, ddof=1):
).groupby(by=self.grouping.keys)

try:
gb_cov = column_pair_groupby.agg(
lambda x: x.cov(min_periods, ddof)
)
gb_cov_corr = column_pair_groupby.agg(func)
except RuntimeError as e:
if "Unsupported groupby reduction type-agg combination" in str(e):
raise TypeError(
"Covariance accepts only numerical column-pairs"
f"{method_name} accepts only numerical column-pairs"
)
raise

Expand All @@ -1143,29 +1097,29 @@ def cov(self, min_periods=0, ddof=1):
for i in range(0, len(cols_list), num_cols)
]

def combine_columns(gb_cov, ys):
list_of_columns = [gb_cov._data[y] for y in ys]
def combine_columns(gb_cov_corr, ys):
list_of_columns = [gb_cov_corr._data[y] for y in ys]
frame = cudf.core.frame.Frame._from_columns(list_of_columns, ys)
return interleave_columns(frame)

# interleave: combine the correlation results for each column-pair
# into a single column
# interleave: combines the correlation or covariance results for each
# column-pair into a single column
res = cudf.DataFrame._from_data(
{
x: combine_columns(gb_cov, ys)
x: combine_columns(gb_cov_corr, ys)
for ys, x in zip(cols_split, column_names)
}
)

# create a multiindex for the groupby correlated dataframe,
# to match pandas behavior
unsorted_idx = gb_cov.index.repeat(num_cols)
# create a multiindex for the groupby covariance or correlation
# dataframe, to match pandas behavior
unsorted_idx = gb_cov_corr.index.repeat(num_cols)
idx_sort_order = unsorted_idx._get_sorted_inds()
sorted_idx = unsorted_idx._gather(idx_sort_order)
if len(gb_cov):
if len(gb_cov_corr):
# TO-DO: Should the operation below be done on the CPU instead?
sorted_idx._data[None] = as_column(
np.tile(column_names, len(gb_cov.index))
np.tile(column_names, len(gb_cov_corr.index))
)
res.index = MultiIndex._from_data(sorted_idx._data)

Expand Down