Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13605] Modify groupby.apply implementation in preparation for pandas 1.4.0 #16706

Merged
merged 3 commits into from
Feb 3, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 85 additions & 23 deletions sdks/python/apache_beam/dataframe/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -2674,11 +2674,9 @@ def duplicated(self, keep, subset):

by = subset or list(self.columns)

# Workaround a bug where groupby.apply() that returns a single-element
# Series moves index label to column
return self.groupby(by).apply(
lambda df: pd.DataFrame(df.duplicated(keep=keep, subset=subset),
columns=[None]))[None]
columns=[None]))[None].droplevel(by)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here.


@frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
Expand Down Expand Up @@ -3975,7 +3973,19 @@ def apply(self, func, *args, **kwargs):
object of the same type as what will be returned when the pipeline is
processing actual data. If the result is a pandas object it should have the
same type and name (for a Series) or column types and names (for
a DataFrame) as the actual results."""
a DataFrame) as the actual results.

Note that in pandas, ``apply`` attempts to detect if the index is unmodified
in ``func`` (indicating ``func`` is a transform) and drops the duplicate
index in the output. To determine this, pandas tests the indexes for
equality. However, Beam cannot do this since it is sensitive to the input
data, instead this implementation tests if the indexes are equivalent
TheNeuralBit marked this conversation as resolved.
Show resolved Hide resolved
with ``is``. See the `pandas 1.4.0 release notes
<https://pandas.pydata.org/docs/dev/whatsnew/v1.4.0.html#groupby-apply-consistent-transform-detection>`_
for a good explanation of the distinction between these approaches. In
practice, this just means that in some cases the Beam result will have
a duplicate index, whereas pandas would have dropped it."""

project = _maybe_project_func(self._projection)
grouping_indexes = self._grouping_indexes
grouping_columns = self._grouping_columns
Expand All @@ -3986,29 +3996,73 @@ def apply(self, func, *args, **kwargs):
fn_input = project(self._ungrouped_with_index.proxy().reset_index(
grouping_columns, drop=True))
result = func(fn_input)
if isinstance(result, pd.core.generic.NDFrame):
if result.index is fn_input.index:
proxy = result
def index_to_arrays(index):
return [index.get_level_values(level)
for level in range(index.nlevels)]


# By default do_apply will just call pandas apply()
# We override it below if necessary
do_apply = lambda gb: gb.apply(func, *args, **kwargs)

if (isinstance(result, pd.core.generic.NDFrame) and
result.index is fn_input.index):
# Special case where apply fn is a transform
# Note we trust that if the user fn produces a proxy with the identical
# index, it will produce results with identical indexes at execution
# time too
proxy = result
elif isinstance(result, pd.DataFrame):
# apply fn is not a transform, we need to make sure the original index
# values are prepended to the result's index
proxy = result[:0]

# First adjust proxy
proxy.index = pd.MultiIndex.from_arrays(
index_to_arrays(self._ungrouped.proxy().index) +
index_to_arrays(proxy.index),
names=self._ungrouped.proxy().index.names + proxy.index.names)


# Then override do_apply function
new_index_names = self._ungrouped.proxy().index.names
if len(new_index_names) > 1:
def add_key_index(key, df):
# df is a dataframe or Series representing the result of func for
# a single key
# key is a tuple with the MultiIndex values for this key
df.index = pd.MultiIndex.from_arrays(
[[key[i]] * len(df) for i in range(len(new_index_names))] + index_to_arrays(df.index),
names=new_index_names + df.index.names)
return df
else:
proxy = result[:0]

def index_to_arrays(index):
return [index.get_level_values(level)
for level in range(index.nlevels)]

# The final result will have the grouped indexes + the indexes from the
# result
proxy.index = pd.MultiIndex.from_arrays(
index_to_arrays(self._ungrouped.proxy().index) +
index_to_arrays(proxy.index),
names=self._ungrouped.proxy().index.names + proxy.index.names)
def add_key_index(key, df):
# df is a dataframe or Series representing the result of func for
# a single key
df.index = pd.MultiIndex.from_arrays(
[[key] * len(df)] + index_to_arrays(df.index),
names=new_index_names + df.index.names)
return df


do_apply = lambda gb: pd.concat([add_key_index(k, func(gb.get_group(k), *args, **kwargs)) for k in gb.groups.keys()])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the critical change - when transform detection will break us, we override do_apply with a custom implementation that executes func over each group.

elif isinstance(result, pd.Series):
if isinstance(fn_input, pd.DataFrame):
# DataFrameGroupBy
dtype = pd.Series([result]).dtype
proxy = pd.DataFrame(columns=result.index, dtype=result.dtype, index=self._ungrouped.proxy().index)
elif isinstance(fn_input, pd.Series):
# SeriesGroupBy
proxy = pd.Series(dtype=result.dtype,
name=result.name,
index=index_to_arrays(self._ungrouped.proxy().index) +
index_to_arrays(result[:0].index))
else:
# The user fn returns some non-pandas type. The expected result is a
# Series where each element is the result of one user fn call.
dtype = pd.Series([result]).dtype
proxy = pd.Series([], dtype=dtype, index=self._ungrouped.proxy().index)


def do_partition_apply(df):
# Remove columns from index, we only needed them there for partitioning
df = df.reset_index(grouping_columns, drop=True)
Expand All @@ -4017,7 +4071,8 @@ def do_partition_apply(df):
by=grouping_columns or None)

gb = project(gb)
return gb.apply(func, *args, **kwargs)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On this topic, what's Beam's guidance on flexibility with Python styling? I'm running formatting/linting on the commit hooks; they don't seem too strict or anything, and I don't want to focus on this too much. I suppose everyone will always impart some of his personalities to the code over time. 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted!

Generally, anything that passes the PythonLint PreCommit (which runs pylint and yapf checkers) is fine. That's not as opinionated as some checkers (e.g. black), so it does leave a decent amount of wiggle room and weird things can slip in like this whitespace change. It's reasonable to point out anything like this that looks odd to you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG. I'm used to working in black, but don't really mind as long as we have a checker

return do_apply(gb)

return DeferredDataFrame(
expressions.ComputedExpression(
Expand Down Expand Up @@ -4117,8 +4172,15 @@ def apply_fn(df):
@property # type: ignore
@frame_base.with_docs_from(DataFrameGroupBy)
def dtypes(self):
grouping_columns = self._grouping_columns
return self.apply(lambda df: df.drop(grouping_columns, axis=1).dtypes)
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'dtypes',
lambda gb: gb.dtypes,
[self._expr],
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Arbitrary()
)
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out the old implementation was relying on incorrect behavior in apply, so I've updated this not to use apply

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice


fillna = frame_base.wont_implement_method(
DataFrameGroupBy, 'fillna', explanation=(
Expand Down