Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13605] Modify groupby.apply implementation in preparation for pandas 1.4.0 #16706

Merged
merged 3 commits into from
Feb 3, 2022

Conversation

TheNeuralBit
Copy link
Member

pandas 1.4.0 changes the logic used for transform detection, which breaks our implementation of apply. Specifically, detecting transforms using index equality does not work for Beam DataFrames, since it requires observing the actual index values at pipeline construction time. In addition, the fact that pandas does it breaks some of our internal usages since some partitions can have equal indexes, and other partitions can have unequal indexes, leading to results that are different between partitions.

This PR modifies our implementation of apply so that it does not defer to pandas' apply when it's transform detection will lead to inconsistent results. It also documents this new divergence from pandas (i.e. our apply implementation will never detect a transform via index equality).

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status Build Status Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status ---
XLang Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@github-actions github-actions bot added the python label Feb 2, 2022
return df


do_apply = lambda gb: pd.concat([add_key_index(k, func(gb.get_group(k), *args, **kwargs)) for k in gb.groups.keys()])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the critical change - when transform detection will break us, we override do_apply with a custom implementation that executes func over each group.

requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Arbitrary()
)
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out the old implementation was relying on incorrect behavior in apply, so I've updated this not to use apply

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

return self.groupby(by).apply(
lambda df: pd.DataFrame(df.duplicated(keep=keep, subset=subset),
columns=[None]))[None]
columns=[None]))[None].droplevel(by)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here.

@TheNeuralBit
Copy link
Member Author

I tested patching this on top of #16590, and verified all apache_beam.dataframe tests pass with pandas 1.4.0

@TheNeuralBit
Copy link
Member Author

R: @yeandy do you think you could review this?

@codecov
Copy link

codecov bot commented Feb 2, 2022

Codecov Report

Merging #16706 (7c9d59d) into master (c06a927) will increase coverage by 37.20%.
The diff coverage is 100.00%.

❗ Current head 7c9d59d differs from pull request most recent head 84cf262. Consider uploading reports for the commit 84cf262 to get more accurate results

Impacted file tree graph

@@             Coverage Diff             @@
##           master   #16706       +/-   ##
===========================================
+ Coverage   46.43%   83.64%   +37.20%     
===========================================
  Files         201      452      +251     
  Lines       19787    62161    +42374     
===========================================
+ Hits         9189    51992    +42803     
- Misses       9612    10169      +557     
+ Partials      986        0      -986     
Impacted Files Coverage Δ
sdks/python/apache_beam/dataframe/frames.py 94.97% <100.00%> (ø)
sdks/go/pkg/beam/core/runtime/exec/discard.go
sdks/go/pkg/beam/transforms/stats/count.go
sdks/go/pkg/beam/io/filesystem/filesystem.go
sdks/go/pkg/beam/core/graph/coder/row_encoder.go
...beam/core/runtime/harness/statecache/statecache.go
sdks/go/pkg/beam/transforms/top/top.go
sdks/go/pkg/beam/flatten.go
sdks/go/pkg/beam/pardo.go
...pkg/beam/core/runtime/xlangx/expansionx/process.go
... and 644 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update c06a927...84cf262. Read the comment docs.

Copy link
Contributor

@yeandy yeandy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Just added a few questions to clarify my understanding

@@ -4017,7 +4076,8 @@ def do_partition_apply(df):
by=grouping_columns or None)

gb = project(gb)
return gb.apply(func, *args, **kwargs)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On this topic, what's Beam's guidance on flexibility with Python styling? I'm running formatting/linting on the commit hooks; they don't seem too strict or anything, and I don't want to focus on this too much. I suppose everyone will always impart some of his personalities to the code over time. 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted!

Generally, anything that passes the PythonLint PreCommit (which runs pylint and yapf checkers) is fine. That's not as opinionated as some checkers (e.g. black), so it does leave a decent amount of wiggle room and weird things can slip in like this whitespace change. It's reasonable to point out anything like this that looks odd to you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG. I'm used to working in black, but don't really mind as long as we have a checker

sdks/python/apache_beam/dataframe/frames.py Outdated Show resolved Hide resolved
Comment on lines 4052 to 4064
elif isinstance(result, pd.Series):
if isinstance(fn_input, pd.DataFrame):
# DataFrameGroupBy
dtype = pd.Series([result]).dtype
proxy = pd.DataFrame(columns=result.index,
dtype=result.dtype,
index=self._ungrouped.proxy().index)
elif isinstance(fn_input, pd.Series):
# SeriesGroupBy
proxy = pd.Series(dtype=result.dtype,
name=result.name,
index=index_to_arrays(self._ungrouped.proxy().index) +
index_to_arrays(result[:0].index))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me better understand the logic under elif isinstance(result, pd.Series):, for both the if isinstance(fn_input, pd.DataFrame): and elif isinstance(fn_input, pd.Series): cases?

Copy link
Member Author

@TheNeuralBit TheNeuralBit Feb 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments to explain both of these cases (and examples in the next comment). Does that help?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is clear now!

Comment on lines 4053 to 4058
if isinstance(fn_input, pd.DataFrame):
# DataFrameGroupBy
dtype = pd.Series([result]).dtype
proxy = pd.DataFrame(columns=result.index,
dtype=result.dtype,
index=self._ungrouped.proxy().index)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the fn_input is a pd.DataFrame, we still want proxy to be of type pd.DataFrame even though the result is pd.Series?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's a bit surprising. It turns out in this case pandas transposes the Series - it's index values become the columns.

In [3]: df
Out[3]: 
     brand style  rating
0  Yum Yum   cup     4.0
1  Yum Yum   cup     4.0
2  Indomie   cup     3.5
3  Indomie  pack    15.0
4  Indomie  pack     5.0

In [5]: df.groupby('style').apply(lambda df: df.rating.describe())
Out[5]: 
rating  count       mean       std  min   25%   50%   75%   max
style                                                          
cup       3.0   3.833333  0.288675  3.5  3.75   4.0   4.0   4.0
pack      2.0  10.000000  7.071068  5.0  7.50  10.0  12.5  15.0

Compare this to the case where fn_input is a Series. In this case the output is still a Series:

In [6]: df.groupby('style').rating.apply(lambda s: s.describe())
Out[6]: 
style       
cup    count     3.000000
       mean      3.833333
       std       0.288675
       min       3.500000
       25%       3.750000
       50%       4.000000
       75%       4.000000
       max       4.000000
pack   count     2.000000
       mean     10.000000
       std       7.071068
       min       5.000000
       25%       7.500000
       50%      10.000000
       75%      12.500000
       max      15.000000
Name: rating, dtype: float64

I'll add a comment clarifying this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the example! Now it better paints the picture on how we construct the proxies. Figuring this out must require playing around with a lot of different DF/Series examples 😆

requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Arbitrary()
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

@TheNeuralBit TheNeuralBit merged commit 51e0e4e into apache:master Feb 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants