Skip to content

Commit

Permalink
[BEAM-13605] Add support for pandas 1.4.0 (#16590)
Browse files Browse the repository at this point in the history
* Addding new functions to / fixing doctests

* Add _rename and value_counts()

* Move import statement

* Add if DataFrame has value_counts attr

* Fix typo

* Update precommit script and setup.py to 1.4

* Add backwards compatability for rename and replace

* Add docstring and simplify kwargs

* Skip DataFrame construction with series

* Add change to CHANGES.md

* Skip failing pyarrow test

* Add pandas 1.4 to tox.ini
  • Loading branch information
yeandy authored Feb 3, 2022
1 parent 6814a06 commit 5beae2a
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 5 deletions.
18 changes: 18 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,24 @@
* ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-->
# [2.37.0] - Unreleased

## Highlights

## I/Os

## New Features / Improvements

* DataFrame API now supports pandas 1.4.x ([BEAM-13605](https://issues.apache.org/jira/browse/BEAM-13605)).

## Breaking Changes

## Deprecations

## Bugfixes

## Known Issues

# 2.36.0 - Unreleased

## Highlights
Expand Down
27 changes: 25 additions & 2 deletions sdks/python/apache_beam/dataframe/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import numpy as np
import pandas as pd
from pandas._libs import lib
from pandas.core.groupby.generic import DataFrameGroupBy

from apache_beam.dataframe import expressions
Expand Down Expand Up @@ -638,10 +639,13 @@ def replace(self, to_replace, value, limit, method, **kwargs):
order-sensitive. It cannot be specified.
If ``limit`` is specified this operation is not parallelizable."""
value_compare = None if PD_VERSION < (1, 4) else lib.no_default
if method is not None and not isinstance(to_replace,
dict) and value is None:
dict) and value is value_compare:
# pandas only relies on method if to_replace is not a dictionary, and
# value is None
# value is the <no_default> value. This is different than
# if ``None`` is explicitly passed for ``value``. In this case, it will be
# respected
raise frame_base.WontImplementError(
f"replace(method={method!r}) is not supported because it is "
"order sensitive. Only replace(method=None) is supported.",
Expand Down Expand Up @@ -1318,6 +1322,9 @@ def align(self, other, join, axis, level, method, **kwargs):
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Singleton())

info = frame_base.wont_implement_method(
pd.Series, 'info', reason="non-deferred-result")

def _idxmaxmin_helper(self, op, **kwargs):
if op == 'idxmax':
func = pd.Series.idxmax
Expand Down Expand Up @@ -4191,6 +4198,20 @@ def dtypes(self):
)
)

if hasattr(DataFrameGroupBy, 'value_counts'):
@frame_base.with_docs_from(DataFrameGroupBy)
def value_counts(self, **kwargs):
"""
DataFrameGroupBy.value_counts() is the same as DataFrame.value_counts()
"""
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'value_counts',
lambda df: df.value_counts(**kwargs), [self._expr],
preserves_partition_by=partitionings.Arbitrary(),
requires_partition_by=partitionings.Arbitrary())
)

fillna = frame_base.wont_implement_method(
DataFrameGroupBy, 'fillna', explanation=(
"df.fillna() should be used instead. Only method=None is supported "
Expand Down Expand Up @@ -4735,6 +4756,8 @@ def repeat(self, repeats):
'match',
'pad',
'partition',
'removeprefix',
'removesuffix',
'replace',
'rpartition',
'rstrip',
Expand Down
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/dataframe/io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import pandas as pd
import pandas.testing
import pyarrow
import pytest
from pandas.testing import assert_frame_equal
from parameterized import parameterized
Expand All @@ -40,6 +41,10 @@
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

# Get major, minor version
PD_VERSION = tuple(map(int, pd.__version__.split('.')[0:2]))
PYARROW_VERSION = tuple(map(int, pyarrow.__version__.split('.')[0:2]))


class SimpleRow(typing.NamedTuple):
value: int
Expand Down Expand Up @@ -101,6 +106,9 @@ def test_read_write_csv(self):
set(self.read_all_lines(output + 'out.csv*')))

@pytest.mark.uses_pyarrow
@unittest.skipIf(
PD_VERSION >= (1, 4) and PYARROW_VERSION < (1, 0),
"pandas 1.4 requires at least pyarrow 1.0.1")
def test_read_write_parquet(self):
self._run_read_write_test(
'parquet', {}, {}, dict(check_index=False), ['pyarrow'])
Expand Down
34 changes: 33 additions & 1 deletion sdks/python/apache_beam/dataframe/pandas_doctests_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def test_ndframe_tests(self):
'pandas.core.generic.NDFrame.replace': [
"s.replace([1, 2], method='bfill')",
# Relies on method='pad'
"s.replace('a')",
# Relies on method='pad'
# value=None is not valid for pandas < 1.4
"s.replace('a', None)",
# Implicitly uses method='pad', but output doesn't rely on that
# behavior. Verified indepently in
Expand Down Expand Up @@ -96,6 +99,7 @@ def test_ndframe_tests(self):
'pandas.core.generic.NDFrame.infer_objects': ['*'],
'pandas.core.generic.NDFrame.ewm': ['*'],
'pandas.core.generic.NDFrame.expanding': ['*'],
'pandas.core.generic.NDFrame.get': ['*'],
},
not_implemented_ok={
'pandas.core.generic.NDFrame.asof': ['*'],
Expand All @@ -121,6 +125,7 @@ def test_ndframe_tests(self):
'pandas.core.generic.NDFrame.convert_dtypes': ['*'],
'pandas.core.generic.NDFrame.copy': ['*'],
'pandas.core.generic.NDFrame.droplevel': ['*'],
'pandas.core.generic.NDFrame.get': ['*'],
'pandas.core.generic.NDFrame.rank': [
# Modified dataframe
'df'
Expand All @@ -133,6 +138,15 @@ def test_ndframe_tests(self):
# pandas doctests only verify the type of exception
'df.rename(2)'
],
# For pandas >= 1.4, rename is changed to _rename
'pandas.core.generic.NDFrame._rename': [
# Seems to be an upstream bug. The actual error has a different
# message:
# TypeError: Index(...) must be called with a collection of
# some kind, 2 was passed
# pandas doctests only verify the type of exception
'df.rename(2)'
],
# Tests rely on setting index
'pandas.core.generic.NDFrame.rename_axis': ['*'],
# Raises right exception, but testing framework has matching issues.
Expand Down Expand Up @@ -191,6 +205,9 @@ def test_dataframe_tests(self):
'pandas.core.frame.DataFrame.replace': [
"s.replace([1, 2], method='bfill')",
# Relies on method='pad'
"s.replace('a')",
# Relies on method='pad'
# value=None is not valid for pandas < 1.4
"s.replace('a', None)",
# Implicitly uses method='pad', but output doesn't rely on that
# behavior. Verified indepently in
Expand Down Expand Up @@ -288,6 +305,12 @@ def test_dataframe_tests(self):
],
},
skip={
# DataFrame construction from a dictionary and
# Series requires using the len() function, which
# is a non-deferred operation that we do not allow
'pandas.core.frame.DataFrame': [
'pd.DataFrame(data=d, index=[0, 1, 2, 3])',
],
# s2 created with reindex
'pandas.core.frame.DataFrame.dot': [
'df.dot(s2)',
Expand Down Expand Up @@ -421,6 +444,7 @@ def test_series_tests(self):
'df.fillna(method="ffill")',
'df.fillna(value=values, limit=1)',
],
'pandas.core.series.Series.info': ['*'],
'pandas.core.series.Series.items': ['*'],
'pandas.core.series.Series.iteritems': ['*'],
# default keep is 'first'
Expand Down Expand Up @@ -453,6 +477,9 @@ def test_series_tests(self):
'pandas.core.series.Series.replace': [
"s.replace([1, 2], method='bfill')",
# Relies on method='pad'
"s.replace('a')",
# Relies on method='pad'
# value=None is not valid for pandas < 1.4
"s.replace('a', None)",
# Implicitly uses method='pad', but output doesn't rely on that
# behavior. Verified indepently in
Expand Down Expand Up @@ -717,7 +744,6 @@ def test_groupby_tests(self):
not_implemented_ok={
'pandas.core.groupby.generic.DataFrameGroupBy.idxmax': ['*'],
'pandas.core.groupby.generic.DataFrameGroupBy.idxmin': ['*'],
'pandas.core.groupby.generic.DataFrameGroupBy.value_counts': ['*'],
'pandas.core.groupby.generic.SeriesGroupBy.transform': ['*'],
'pandas.core.groupby.generic.SeriesGroupBy.idxmax': ['*'],
'pandas.core.groupby.generic.SeriesGroupBy.idxmin': ['*'],
Expand Down Expand Up @@ -749,6 +775,12 @@ def test_groupby_tests(self):
# Skipped idxmax/idxmin due an issue with the test framework
'pandas.core.groupby.generic.SeriesGroupBy.idxmin': ['s.idxmin()'],
'pandas.core.groupby.generic.SeriesGroupBy.idxmax': ['s.idxmax()'],
# Uses as_index, which is currently not_implemented
'pandas.core.groupby.generic.DataFrameGroupBy.value_counts': [
"df.groupby('gender', as_index=False).value_counts()",
# pylint: disable=line-too-long
"df.groupby('gender', as_index=False).value_counts(normalize=True)",
],
})
self.assertEqual(result.failed, 0)

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def run(self):
'interactive_test': INTERACTIVE_BEAM_TEST,
'aws': AWS_REQUIREMENTS,
'azure': AZURE_REQUIREMENTS,
'dataframe': ['pandas>=1.0,<1.4']
'dataframe': ['pandas>=1.0,<1.5']
},
zip_safe=False,
# PyPI package information.
Expand Down
4 changes: 4 additions & 0 deletions sdks/python/test-suites/tox/py38/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ toxTask "testPy38pandas-13", "py38-pandas-13"
test.dependsOn "testPy38pandas-13"
preCommitPy38.dependsOn "testPy38pandas-13"

toxTask "testPy38pandas-14", "py38-pandas-14"
test.dependsOn "testPy38pandas-14"
preCommitPy38.dependsOn "testPy38pandas-14"

toxTask "whitespacelint", "whitespacelint"

task archiveFilesToLint(type: Zip) {
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,12 @@ commands =
# selecting tests with -m (BEAM-12985)
pytest -o junit_suite_name={envname} --junitxml=pytest_{envname}.xml -n 6 -m uses_pyarrow {posargs}

[testenv:py{36,37,38}-pandas-{11,12,13}]
[testenv:py{36,37,38}-pandas-{11,12,13,14}]
deps =
11: pandas>=1.1.0,<1.2.0
12: pandas>=1.2.0,<1.3.0
13: pandas>=1.3.0,<1.4.0
14: pandas>=1.4.0,<1.5.0
commands =
# Log pandas and numpy version for debugging
/bin/sh -c "pip freeze | grep -E '(pandas|numpy)'"
Expand Down

0 comments on commit 5beae2a

Please sign in to comment.