-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-13605] Modify groupby.apply implementation in preparation for pandas 1.4.0 #16706
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2674,11 +2674,9 @@ def duplicated(self, keep, subset): | |
|
||
by = subset or list(self.columns) | ||
|
||
# Workaround a bug where groupby.apply() that returns a single-element | ||
# Series moves index label to column | ||
return self.groupby(by).apply( | ||
lambda df: pd.DataFrame(df.duplicated(keep=keep, subset=subset), | ||
columns=[None]))[None] | ||
columns=[None]))[None].droplevel(by) | ||
|
||
@frame_base.with_docs_from(pd.DataFrame) | ||
@frame_base.args_to_kwargs(pd.DataFrame) | ||
|
@@ -3975,7 +3973,19 @@ def apply(self, func, *args, **kwargs): | |
object of the same type as what will be returned when the pipeline is | ||
processing actual data. If the result is a pandas object it should have the | ||
same type and name (for a Series) or column types and names (for | ||
a DataFrame) as the actual results.""" | ||
a DataFrame) as the actual results. | ||
|
||
Note that in pandas, ``apply`` attempts to detect if the index is unmodified | ||
in ``func`` (indicating ``func`` is a transform) and drops the duplicate | ||
index in the output. To determine this, pandas tests the indexes for | ||
equality. However, Beam cannot do this since it is sensitive to the input | ||
data, instead this implementation tests if the indexes are equivalent | ||
TheNeuralBit marked this conversation as resolved.
Show resolved
Hide resolved
|
||
with ``is``. See the `pandas 1.4.0 release notes | ||
<https://pandas.pydata.org/docs/dev/whatsnew/v1.4.0.html#groupby-apply-consistent-transform-detection>`_ | ||
for a good explanation of the distinction between these approaches. In | ||
practice, this just means that in some cases the Beam result will have | ||
a duplicate index, whereas pandas would have dropped it.""" | ||
|
||
project = _maybe_project_func(self._projection) | ||
grouping_indexes = self._grouping_indexes | ||
grouping_columns = self._grouping_columns | ||
|
@@ -3986,29 +3996,78 @@ def apply(self, func, *args, **kwargs): | |
fn_input = project(self._ungrouped_with_index.proxy().reset_index( | ||
grouping_columns, drop=True)) | ||
result = func(fn_input) | ||
if isinstance(result, pd.core.generic.NDFrame): | ||
if result.index is fn_input.index: | ||
proxy = result | ||
def index_to_arrays(index): | ||
return [index.get_level_values(level) | ||
for level in range(index.nlevels)] | ||
|
||
|
||
# By default do_apply will just call pandas apply() | ||
# We override it below if necessary | ||
do_apply = lambda gb: gb.apply(func, *args, **kwargs) | ||
|
||
if (isinstance(result, pd.core.generic.NDFrame) and | ||
result.index is fn_input.index): | ||
# Special case where apply fn is a transform | ||
# Note we trust that if the user fn produces a proxy with the identical | ||
# index, it will produce results with identical indexes at execution | ||
# time too | ||
proxy = result | ||
elif isinstance(result, pd.DataFrame): | ||
# apply fn is not a transform, we need to make sure the original index | ||
# values are prepended to the result's index | ||
proxy = result[:0] | ||
|
||
# First adjust proxy | ||
proxy.index = pd.MultiIndex.from_arrays( | ||
index_to_arrays(self._ungrouped.proxy().index) + | ||
index_to_arrays(proxy.index), | ||
names=self._ungrouped.proxy().index.names + proxy.index.names) | ||
|
||
|
||
# Then override do_apply function | ||
new_index_names = self._ungrouped.proxy().index.names | ||
if len(new_index_names) > 1: | ||
def add_key_index(key, df): | ||
# df is a dataframe or Series representing the result of func for | ||
# a single key | ||
# key is a tuple with the MultiIndex values for this key | ||
df.index = pd.MultiIndex.from_arrays( | ||
[[key[i]] * len(df) for i in range(len(new_index_names))] + | ||
index_to_arrays(df.index), | ||
names=new_index_names + df.index.names) | ||
return df | ||
else: | ||
proxy = result[:0] | ||
|
||
def index_to_arrays(index): | ||
return [index.get_level_values(level) | ||
for level in range(index.nlevels)] | ||
|
||
# The final result will have the grouped indexes + the indexes from the | ||
# result | ||
proxy.index = pd.MultiIndex.from_arrays( | ||
index_to_arrays(self._ungrouped.proxy().index) + | ||
index_to_arrays(proxy.index), | ||
names=self._ungrouped.proxy().index.names + proxy.index.names) | ||
def add_key_index(key, df): | ||
# df is a dataframe or Series representing the result of func for | ||
# a single key | ||
df.index = pd.MultiIndex.from_arrays( | ||
[[key] * len(df)] + index_to_arrays(df.index), | ||
names=new_index_names + df.index.names) | ||
return df | ||
|
||
|
||
do_apply = lambda gb: pd.concat([ | ||
add_key_index(k, func(gb.get_group(k), *args, **kwargs)) | ||
for k in gb.groups.keys()]) | ||
elif isinstance(result, pd.Series): | ||
if isinstance(fn_input, pd.DataFrame): | ||
# DataFrameGroupBy | ||
dtype = pd.Series([result]).dtype | ||
proxy = pd.DataFrame(columns=result.index, | ||
dtype=result.dtype, | ||
index=self._ungrouped.proxy().index) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah it's a bit surprising. It turns out in this case pandas transposes the Series - it's index values become the columns.
Compare this to the case where
I'll add a comment clarifying this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the example! Now it better paints the picture on how we construct the proxies. Figuring this out must require playing around with a lot of different DF/Series examples 😆 |
||
elif isinstance(fn_input, pd.Series): | ||
# SeriesGroupBy | ||
proxy = pd.Series(dtype=result.dtype, | ||
name=result.name, | ||
index=index_to_arrays(self._ungrouped.proxy().index) + | ||
index_to_arrays(result[:0].index)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you help me better understand the logic under There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added some comments to explain both of these cases (and examples in the next comment). Does that help? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is clear now! |
||
else: | ||
# The user fn returns some non-pandas type. The expected result is a | ||
# Series where each element is the result of one user fn call. | ||
dtype = pd.Series([result]).dtype | ||
proxy = pd.Series([], dtype=dtype, index=self._ungrouped.proxy().index) | ||
|
||
|
||
def do_partition_apply(df): | ||
# Remove columns from index, we only needed them there for partitioning | ||
df = df.reset_index(grouping_columns, drop=True) | ||
|
@@ -4017,7 +4076,8 @@ def do_partition_apply(df): | |
by=grouping_columns or None) | ||
|
||
gb = project(gb) | ||
return gb.apply(func, *args, **kwargs) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: extra space There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On this topic, what's Beam's guidance on flexibility with Python styling? I'm running formatting/linting on the commit hooks; they don't seem too strict or anything, and I don't want to focus on this too much. I suppose everyone will always impart some of his personalities to the code over time. 😄 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deleted! Generally, anything that passes the PythonLint PreCommit (which runs pylint and yapf checkers) is fine. That's not as opinionated as some checkers (e.g. black), so it does leave a decent amount of wiggle room and weird things can slip in like this whitespace change. It's reasonable to point out anything like this that looks odd to you. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SG. I'm used to working in black, but don't really mind as long as we have a checker |
||
return do_apply(gb) | ||
|
||
return DeferredDataFrame( | ||
expressions.ComputedExpression( | ||
|
@@ -4117,8 +4177,15 @@ def apply_fn(df): | |
@property # type: ignore | ||
@frame_base.with_docs_from(DataFrameGroupBy) | ||
def dtypes(self): | ||
grouping_columns = self._grouping_columns | ||
return self.apply(lambda df: df.drop(grouping_columns, axis=1).dtypes) | ||
return frame_base.DeferredFrame.wrap( | ||
expressions.ComputedExpression( | ||
'dtypes', | ||
lambda gb: gb.dtypes, | ||
[self._expr], | ||
requires_partition_by=partitionings.Arbitrary(), | ||
preserves_partition_by=partitionings.Arbitrary() | ||
) | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It turns out the old implementation was relying on incorrect behavior in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice |
||
|
||
fillna = frame_base.wont_implement_method( | ||
DataFrameGroupBy, 'fillna', explanation=( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly here.