Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13947] Add split() and rsplit(), non-deferred column operations on categorical columns #16677

Merged
merged 10 commits into from
Mar 4, 2022
90 changes: 84 additions & 6 deletions sdks/python/apache_beam/dataframe/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -4721,13 +4721,91 @@ def repeat(self, repeats):
pd.core.strings.StringMethods, 'get_dummies',
reason='non-deferred-columns')

split = frame_base.wont_implement_method(
pd.core.strings.StringMethods, 'split',
reason='non-deferred-columns')
def _split_helper(
self, rsplit=False, pat=None, expand=False, regex=None, **kwargs):

# Adding arguments to kwargs. regex introduced in pandas 1.4
# but only for split, not rsplit
kwargs['pat'] = pat
kwargs['expand'] = expand
if PD_VERSION >= (1, 4) and not rsplit:
kwargs['regex'] = regex
yeandy marked this conversation as resolved.
Show resolved Hide resolved

if not expand:
# Not creating separate columns
proxy = self._expr.proxy()
func = lambda s: pd.concat([proxy,
(s.str.split(**kwargs) if not rsplit else s.str.rsplit(**kwargs))]
yeandy marked this conversation as resolved.
Show resolved Hide resolved
)
else:
# Creating separate columns, so must be more strict on dtype
dtype = self._expr.proxy().dtype
if not isinstance(dtype, pd.CategoricalDtype):
method_name = 'rsplit' if rsplit else 'split'
raise frame_base.WontImplementError(
method_name + "() of non-categorical type is not supported because "
yeandy marked this conversation as resolved.
Show resolved Hide resolved
"the type of the output column depends on the data. Please use "
"pd.CategoricalDtype with explicit categories.",
reason="non-deferred-columns")

rsplit = frame_base.wont_implement_method(
pd.core.strings.StringMethods, 'rsplit',
reason='non-deferred-columns')
if regex is False or (
regex is None and (
(not pat) or (isinstance(pat, str) and len(pat) == 1)
)
):
# Treat pat as literal string
split_cats = [
cat.split(
sep=kwargs.get('pat'),
maxsplit=kwargs.get('n', -1)
) for cat in dtype.categories
]
else:
# Treat pat as regex
split_cats = [
re.split(
pattern=pat,
string=cat,
maxsplit=kwargs.get('n', 0)
) for cat in dtype.categories
]
yeandy marked this conversation as resolved.
Show resolved Hide resolved

max_splits = len(max(split_cats, key=len))
proxy = pd.DataFrame(columns=range(max_splits))

func = lambda s: pd.concat([proxy,
(s.str.split(**kwargs) if not rsplit else s.str.rsplit(**kwargs))]
).replace(np.nan, value=None)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an entry in a series is np.nan, and then is converted to dtype CategoricalDtype, then pandas behavior is to propogate the NaN. Example:

>>>s = pd.Series(
    [
        "this is a regular sentence",
        "https://docs.python.org/3/tutorial/index.html",
        np.nan
    ]
)
>>>s.str.split(expand=True)
                                               0     1     2        3         4
0                                           this    is     a  regular  sentence
1  https://docs.python.org/3/tutorial/index.html  None  None     None      None
2                                            NaN   NaN   NaN      NaN       NaN

In row 1, where which the string does not get split, in order to propagate None into other columns, I do .replace(np.nan, value=None). However this makes row 2 be all None instead of NaN.

Is there a way to only choose specific rows to be NaN and not None? i.e. use s.isna() to find those indices?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this should be doable, I'm not sure if it can be done in a one-liner though. You could define a multi-line function, something like

def func(s):
  result = s.str.split(**kwargs)
  s[~s.isna()].str.replace(np.nan, value=None, inplace=True)
  return result

Making it multi-line might improve readability anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! I was playing around with very similar logic to this, and was running into issues passing indices ~s.isna() into the original series and doing subsequent updates to the result. So thought that it wasn't possible. I guess it there was some sort of oversight of mine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mm well it's certainly possible I'm missing something, but I think this should work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I wasn't clear. The code you have suggested works (just pushed a commit with it). My point was that I was trying to do something similar -- with logic that sort of looked like yours (though I don't remember exactly how it looked) -- but for some reason I was having issue.


return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'split',
func,
[self._expr],
proxy=proxy,
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Arbitrary()))

@frame_base.with_docs_from(pd.core.strings.StringMethods)
@frame_base.args_to_kwargs(pd.core.strings.StringMethods)
yeandy marked this conversation as resolved.
Show resolved Hide resolved
def split(self, pat=None, expand=False, regex=None, **kwargs):
"""
Like other non-deferred methods, dtype must be CategoricalDtype.
One exception is when ``expand`` is ``False``. Because we are not
creating new columns at construction time, dtype can be `str`.
"""
return self._split_helper(
rsplit=False, pat=pat, expand=expand, regex=regex, **kwargs)

@frame_base.with_docs_from(pd.core.strings.StringMethods)
@frame_base.args_to_kwargs(pd.core.strings.StringMethods)
def rsplit(self, pat=None, expand=False, **kwargs):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rsplit API does not support regex as an argument even though its documentation does. This is probably not a bug, but rather documentation inconsistency. I can file an issue w/ pandas.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah let's file an issue with pandas about this, if you want to get a commit in pandas I think it's a valid use of your time to contribute a fix too. If not I'm sure someone will pick it up, it's a very active community.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it was filed a while time ago, but no one has picked it up. I'll assign it to myself to do. In the meantime, I can add a subtask (under the non-deferred ticket?) to remind us to update the function api once the rsplit fix in pandas is released.

"""
Like other non-deferred methods, dtype must be CategoricalDtype.
One exception is when ``expand`` is ``False``. Because we are not
creating new columns at construction time, dtype can be `str`.
"""
return self._split_helper(rsplit=True, pat=pat, expand=expand, **kwargs)


ELEMENTWISE_STRING_METHODS = [
Expand Down
112 changes: 112 additions & 0 deletions sdks/python/apache_beam/dataframe/frames_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import re
import unittest

import numpy as np
Expand Down Expand Up @@ -2247,6 +2248,117 @@ def test_sample_with_weights_distribution(self):
expected = num_samples * target_prob
self.assertTrue(expected / 3 < result < expected * 2, (expected, result))

def test_split_pandas_examples_no_expand(self):
# if expand=False (default), then no need to cast dtype to be
# CategoricalDtype.
s = pd.Series([
"this is a regular sentence",
"https://docs.python.org/3/tutorial/index.html",
np.nan
])
result = self._evaluate(lambda s: s.str.split(), s)
self.assert_frame_data_equivalent(result, s.str.split())

result = self._evaluate(lambda s: s.str.rsplit(), s)
self.assert_frame_data_equivalent(result, s.str.rsplit())

result = self._evaluate(lambda s: s.str.split(n=2), s)
self.assert_frame_data_equivalent(result, s.str.split(n=2))

result = self._evaluate(lambda s: s.str.rsplit(n=2), s)
self.assert_frame_data_equivalent(result, s.str.rsplit(n=2))

result = self._evaluate(lambda s: s.str.split(pat="/"), s)
self.assert_frame_data_equivalent(result, s.str.split(pat="/"))

def test_split_pandas_examples_expand_not_categorical(self):
# When expand=True, there is exception because series is not categorical
s = pd.Series([
"this is a regular sentence",
"https://docs.python.org/3/tutorial/index.html",
np.nan
])
with self.assertRaisesRegex(
frame_base.WontImplementError,
r"split\(\) of non-categorical type is not supported"):
self._evaluate(lambda s: s.str.split(expand=True), s)

with self.assertRaisesRegex(
frame_base.WontImplementError,
r"rsplit\(\) of non-categorical type is not supported"):
self._evaluate(lambda s: s.str.rsplit(expand=True), s)

def test_split_pandas_examples_expand_pat_is_string_literal1(self):
# When expand=True and pattern is treated as a string literal
s = pd.Series([
"this is a regular sentence",
"https://docs.python.org/3/tutorial/index.html",
np.nan
])
s = s.astype(
pd.CategoricalDtype(
categories=[
'this is a regular sentence',
'https://docs.python.org/3/tutorial/index.html'
]))
result = self._evaluate(lambda s: s.str.split(expand=True), s)
self.assert_frame_data_equivalent(result, s.str.split(expand=True))

result = self._evaluate(lambda s: s.str.rsplit("/", n=1, expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.rsplit("/", n=1, expand=True))

@unittest.skipIf(PD_VERSION < (1, 4), "regex arg is new in pandas 1.4")
def test_split_pandas_examples_expand_pat_is_string_literal2(self):
# when regex is None (default) regex pat is string literal if len(pat) == 1
s = pd.Series(['foojpgbar.jpg']).astype('category')
s = s.astype(pd.CategoricalDtype(categories=["foojpgbar.jpg"]))
result = self._evaluate(lambda s: s.str.split(r".", expand=True), s)
self.assert_frame_data_equivalent(result, s.str.split(r".", expand=True))

# When regex=False, pat is interpreted as the string itself
result = self._evaluate(
lambda s: s.str.split(r"\.jpg", regex=False, expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"\.jpg", regex=False, expand=True))

@unittest.skipIf(PD_VERSION < (1, 4), "regex arg is new in pandas 1.4")
def test_split_pandas_examples_expand_pat_is_regex(self):
# when regex is None (default) regex pat is compiled if len(pat) != 1
s = pd.Series(["foo and bar plus baz"])
s = s.astype(pd.CategoricalDtype(categories=["foo and bar plus baz"]))
result = self._evaluate(lambda s: s.str.split(r"and|plus", expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"and|plus", expand=True))

s = pd.Series(['foojpgbar.jpg']).astype('category')
s = s.astype(pd.CategoricalDtype(categories=["foojpgbar.jpg"]))
result = self._evaluate(lambda s: s.str.split(r"\.jpg", expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"\.jpg", expand=True))

# When regex=True, pat is interpreted as a regex
result = self._evaluate(
lambda s: s.str.split(r"\.jpg", regex=True, expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"\.jpg", regex=True, expand=True))

# A compiled regex can be passed as pat
result = self._evaluate(
lambda s: s.str.split(re.compile(r"\.jpg"), expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(re.compile(r"\.jpg"), expand=True))

@unittest.skipIf(PD_VERSION < (1, 4), "regex arg is new in pandas 1.4")
def test_split_pat_is_regex(self):
# regex=True, but expand=False
s = pd.Series(['foojpgbar.jpg']).astype('category')
s = s.astype(pd.CategoricalDtype(categories=["foojpgbar.jpg"]))
result = self._evaluate(
lambda s: s.str.split(r"\.jpg", regex=True, expand=False), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"\.jpg", regex=True, expand=False))


class AllowNonParallelTest(unittest.TestCase):
def _use_non_parallel_operation(self):
Expand Down