Skip to content

Commit

Permalink
[BEAM-13947] Add split() and rsplit(), non-deferred column operations…
Browse files Browse the repository at this point in the history
… on categorical columns (apache#16677)

* Add split/rsplit; Need to refactor regex

* Support Regex; Refactor tests

* Remove debugger

* fix grammar

* Fix passing regex arg

* Reorder imports

* Address PR comments; Simplify kwargs

* Simplify getting columns for split_cat

* Update doctests to skip expand=True operations

* Fix missing doctest
  • Loading branch information
yeandy authored and nancyxu123 committed Mar 9, 2022
1 parent 00dd9fb commit b402d5e
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 9 deletions.
71 changes: 65 additions & 6 deletions sdks/python/apache_beam/dataframe/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -4765,13 +4765,72 @@ def get_dummies(self, **kwargs):
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Arbitrary()))

split = frame_base.wont_implement_method(
pd.core.strings.StringMethods, 'split',
reason='non-deferred-columns')
def _split_helper(self, rsplit=False, **kwargs):
expand = kwargs.get('expand', False)

rsplit = frame_base.wont_implement_method(
pd.core.strings.StringMethods, 'rsplit',
reason='non-deferred-columns')
if not expand:
# Not creating separate columns
proxy = self._expr.proxy()
if not rsplit:
func = lambda s: pd.concat([proxy, s.str.split(**kwargs)])
else:
func = lambda s: pd.concat([proxy, s.str.rsplit(**kwargs)])
else:
# Creating separate columns, so must be more strict on dtype
dtype = self._expr.proxy().dtype
if not isinstance(dtype, pd.CategoricalDtype):
method_name = 'rsplit' if rsplit else 'split'
raise frame_base.WontImplementError(
f"{method_name}() of non-categorical type is not supported because "
"the type of the output column depends on the data. Please use "
"pd.CategoricalDtype with explicit categories.",
reason="non-deferred-columns")

# Split the categories
split_cats = dtype.categories.str.split(**kwargs)

# Count the number of new columns to create for proxy
max_splits = len(max(split_cats, key=len))
proxy = pd.DataFrame(columns=range(max_splits))

def func(s):
if not rsplit:
result = s.str.split(**kwargs)
else:
result = s.str.rsplit(**kwargs)
result[~result.isna()].replace(np.nan, value=None)
return result

return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'split',
func,
[self._expr],
proxy=proxy,
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Arbitrary()))

@frame_base.with_docs_from(pd.core.strings.StringMethods)
@frame_base.args_to_kwargs(pd.core.strings.StringMethods)
@frame_base.populate_defaults(pd.core.strings.StringMethods)
def split(self, **kwargs):
"""
Like other non-deferred methods, dtype must be CategoricalDtype.
One exception is when ``expand`` is ``False``. Because we are not
creating new columns at construction time, dtype can be `str`.
"""
return self._split_helper(rsplit=False, **kwargs)

@frame_base.with_docs_from(pd.core.strings.StringMethods)
@frame_base.args_to_kwargs(pd.core.strings.StringMethods)
@frame_base.populate_defaults(pd.core.strings.StringMethods)
def rsplit(self, **kwargs):
"""
Like other non-deferred methods, dtype must be CategoricalDtype.
One exception is when ``expand`` is ``False``. Because we are not
creating new columns at construction time, dtype can be `str`.
"""
return self._split_helper(rsplit=True, **kwargs)


ELEMENTWISE_STRING_METHODS = [
Expand Down
112 changes: 112 additions & 0 deletions sdks/python/apache_beam/dataframe/frames_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import re
import unittest

import numpy as np
Expand Down Expand Up @@ -2356,6 +2357,117 @@ def test_sample_with_weights_distribution(self):
expected = num_samples * target_prob
self.assertTrue(expected / 3 < result < expected * 2, (expected, result))

def test_split_pandas_examples_no_expand(self):
# if expand=False (default), then no need to cast dtype to be
# CategoricalDtype.
s = pd.Series([
"this is a regular sentence",
"https://docs.python.org/3/tutorial/index.html",
np.nan
])
result = self._evaluate(lambda s: s.str.split(), s)
self.assert_frame_data_equivalent(result, s.str.split())

result = self._evaluate(lambda s: s.str.rsplit(), s)
self.assert_frame_data_equivalent(result, s.str.rsplit())

result = self._evaluate(lambda s: s.str.split(n=2), s)
self.assert_frame_data_equivalent(result, s.str.split(n=2))

result = self._evaluate(lambda s: s.str.rsplit(n=2), s)
self.assert_frame_data_equivalent(result, s.str.rsplit(n=2))

result = self._evaluate(lambda s: s.str.split(pat="/"), s)
self.assert_frame_data_equivalent(result, s.str.split(pat="/"))

def test_split_pandas_examples_expand_not_categorical(self):
# When expand=True, there is exception because series is not categorical
s = pd.Series([
"this is a regular sentence",
"https://docs.python.org/3/tutorial/index.html",
np.nan
])
with self.assertRaisesRegex(
frame_base.WontImplementError,
r"split\(\) of non-categorical type is not supported"):
self._evaluate(lambda s: s.str.split(expand=True), s)

with self.assertRaisesRegex(
frame_base.WontImplementError,
r"rsplit\(\) of non-categorical type is not supported"):
self._evaluate(lambda s: s.str.rsplit(expand=True), s)

def test_split_pandas_examples_expand_pat_is_string_literal1(self):
# When expand=True and pattern is treated as a string literal
s = pd.Series([
"this is a regular sentence",
"https://docs.python.org/3/tutorial/index.html",
np.nan
])
s = s.astype(
pd.CategoricalDtype(
categories=[
'this is a regular sentence',
'https://docs.python.org/3/tutorial/index.html'
]))
result = self._evaluate(lambda s: s.str.split(expand=True), s)
self.assert_frame_data_equivalent(result, s.str.split(expand=True))

result = self._evaluate(lambda s: s.str.rsplit("/", n=1, expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.rsplit("/", n=1, expand=True))

@unittest.skipIf(PD_VERSION < (1, 4), "regex arg is new in pandas 1.4")
def test_split_pandas_examples_expand_pat_is_string_literal2(self):
# when regex is None (default) regex pat is string literal if len(pat) == 1
s = pd.Series(['foojpgbar.jpg']).astype('category')
s = s.astype(pd.CategoricalDtype(categories=["foojpgbar.jpg"]))
result = self._evaluate(lambda s: s.str.split(r".", expand=True), s)
self.assert_frame_data_equivalent(result, s.str.split(r".", expand=True))

# When regex=False, pat is interpreted as the string itself
result = self._evaluate(
lambda s: s.str.split(r"\.jpg", regex=False, expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"\.jpg", regex=False, expand=True))

@unittest.skipIf(PD_VERSION < (1, 4), "regex arg is new in pandas 1.4")
def test_split_pandas_examples_expand_pat_is_regex(self):
# when regex is None (default) regex pat is compiled if len(pat) != 1
s = pd.Series(["foo and bar plus baz"])
s = s.astype(pd.CategoricalDtype(categories=["foo and bar plus baz"]))
result = self._evaluate(lambda s: s.str.split(r"and|plus", expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"and|plus", expand=True))

s = pd.Series(['foojpgbar.jpg']).astype('category')
s = s.astype(pd.CategoricalDtype(categories=["foojpgbar.jpg"]))
result = self._evaluate(lambda s: s.str.split(r"\.jpg", expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"\.jpg", expand=True))

# When regex=True, pat is interpreted as a regex
result = self._evaluate(
lambda s: s.str.split(r"\.jpg", regex=True, expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"\.jpg", regex=True, expand=True))

# A compiled regex can be passed as pat
result = self._evaluate(
lambda s: s.str.split(re.compile(r"\.jpg"), expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(re.compile(r"\.jpg"), expand=True))

@unittest.skipIf(PD_VERSION < (1, 4), "regex arg is new in pandas 1.4")
def test_split_pat_is_regex(self):
# regex=True, but expand=False
s = pd.Series(['foojpgbar.jpg']).astype('category')
s = s.astype(pd.CategoricalDtype(categories=["foojpgbar.jpg"]))
result = self._evaluate(
lambda s: s.str.split(r"\.jpg", regex=True, expand=False), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"\.jpg", regex=True, expand=False))


class AllowNonParallelTest(unittest.TestCase):
def _use_non_parallel_operation(self):
Expand Down
29 changes: 26 additions & 3 deletions sdks/python/apache_beam/dataframe/pandas_doctests_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,6 @@ def test_string_tests(self):
f'{module_name}.StringMethods.get_dummies': ['*'],
f'{module_name}.str_get_dummies': ['*'],
f'{module_name}.StringMethods': ['s.str.split("_")'],
f'{module_name}.StringMethods.rsplit': ['*'],
f'{module_name}.StringMethods.split': ['*'],
},
skip={
# count() on Series with a NaN produces mismatched type if we
Expand All @@ -602,7 +600,32 @@ def test_string_tests(self):
],

# output has incorrect formatting in 1.2.x
f'{module_name}.StringMethods.extractall': ['*']
f'{module_name}.StringMethods.extractall': ['*'],

# For split and rsplit, if expand=True, then the series
# must be of CategoricalDtype, which pandas doesn't convert to
f'{module_name}.StringMethods.rsplit': [
's.str.split(r"\\+|=", expand=True)', # for pandas<1.4
's.str.split(expand=True)',
's.str.rsplit("/", n=1, expand=True)',
's.str.split(r"and|plus", expand=True)',
's.str.split(r".", expand=True)',
's.str.split(r"\\.jpg", expand=True)',
's.str.split(r"\\.jpg", regex=True, expand=True)',
's.str.split(re.compile(r"\\.jpg"), expand=True)',
's.str.split(r"\\.jpg", regex=False, expand=True)'
],
f'{module_name}.StringMethods.split': [
's.str.split(r"\\+|=", expand=True)', # for pandas<1.4
's.str.split(expand=True)',
's.str.rsplit("/", n=1, expand=True)',
's.str.split(r"and|plus", expand=True)',
's.str.split(r".", expand=True)',
's.str.split(r"\\.jpg", expand=True)',
's.str.split(r"\\.jpg", regex=True, expand=True)',
's.str.split(re.compile(r"\\.jpg"), expand=True)',
's.str.split(r"\\.jpg", regex=False, expand=True)'
]
})
self.assertEqual(result.failed, 0)

Expand Down

0 comments on commit b402d5e

Please sign in to comment.