Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13947] Add split() and rsplit(), non-deferred column operations on categorical columns #16677

Merged
merged 10 commits into from
Mar 4, 2022
81 changes: 75 additions & 6 deletions sdks/python/apache_beam/dataframe/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -4721,13 +4721,82 @@ def repeat(self, repeats):
pd.core.strings.StringMethods, 'get_dummies',
reason='non-deferred-columns')

split = frame_base.wont_implement_method(
pd.core.strings.StringMethods, 'split',
reason='non-deferred-columns')
def _split_helper(
self, rsplit=False, pat=None, expand=False, regex=None, **kwargs):
if not expand:
# Not creating separate columns
proxy = self._expr.proxy()
func = lambda s: pd.concat([proxy,
(s.str.split(pat=pat, expand=expand, regex=regex, **kwargs)
if not rsplit else s.str.rsplit(pat=pat, expand=expand, **kwargs))]
)
else:
# Creating separate columns, so must be more strict on dtype
dtype = self._expr.proxy().dtype
if not isinstance(dtype, pd.CategoricalDtype):
method_name = 'rsplit' if rsplit else 'split'
raise frame_base.WontImplementError(
method_name + "() of non-categorical type is not supported because "
yeandy marked this conversation as resolved.
Show resolved Hide resolved
"the type of the output column depends on the data. Please use "
"pd.CategoricalDtype with explicit categories.",
reason="non-deferred-columns")

rsplit = frame_base.wont_implement_method(
pd.core.strings.StringMethods, 'rsplit',
reason='non-deferred-columns')
if regex is False or (
regex is None and isinstance(pat, str) and len(pat) == 1):
# Treat pat as literal string
split_cats = [
cat.split(
sep=kwargs.get('pat'),
maxsplit=kwargs.get('n', -1)
) for cat in dtype.categories
]
else:
# Treat pat as regex
split_cats = [
re.split(
pattern=pat,
string=cat,
maxsplit=kwargs.get('n', 0)
) for cat in dtype.categories
]
yeandy marked this conversation as resolved.
Show resolved Hide resolved

max_splits = len(max(split_cats, key=len))
proxy = pd.DataFrame(columns=range(max_splits))

func = lambda s: pd.concat([proxy,
(s.str.split(pat=pat, expand=expand, regex=regex, **kwargs)
if not rsplit else s.str.rsplit(pat=pat, expand=expand, **kwargs))]
).replace(np.nan, value=None)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an entry in a series is np.nan, and then is converted to dtype CategoricalDtype, then pandas behavior is to propogate the NaN. Example:

>>>s = pd.Series(
    [
        "this is a regular sentence",
        "https://docs.python.org/3/tutorial/index.html",
        np.nan
    ]
)
>>>s.str.split(expand=True)
                                               0     1     2        3         4
0                                           this    is     a  regular  sentence
1  https://docs.python.org/3/tutorial/index.html  None  None     None      None
2                                            NaN   NaN   NaN      NaN       NaN

In row 1, where which the string does not get split, in order to propagate None into other columns, I do .replace(np.nan, value=None). However this makes row 2 be all None instead of NaN.

Is there a way to only choose specific rows to be NaN and not None? i.e. use s.isna() to find those indices?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this should be doable, I'm not sure if it can be done in a one-liner though. You could define a multi-line function, something like

def func(s):
  result = s.str.split(**kwargs)
  s[~s.isna()].str.replace(np.nan, value=None, inplace=True)
  return result

Making it multi-line might improve readability anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! I was playing around with very similar logic to this, and was running into issues passing indices ~s.isna() into the original series and doing subsequent updates to the result. So thought that it wasn't possible. I guess it there was some sort of oversight of mine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mm well it's certainly possible I'm missing something, but I think this should work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I wasn't clear. The code you have suggested works (just pushed a commit with it). My point was that I was trying to do something similar -- with logic that sort of looked like yours (though I don't remember exactly how it looked) -- but for some reason I was having issue.


return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'split',
func,
[self._expr],
proxy=proxy,
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Arbitrary()))

@frame_base.with_docs_from(pd.core.strings.StringMethods)
@frame_base.args_to_kwargs(pd.core.strings.StringMethods)
yeandy marked this conversation as resolved.
Show resolved Hide resolved
def split(self, pat=None, expand=False, regex=None, **kwargs):
"""
Like other non-deferred methods, dtype must be CategoricalDtype.
One exception is when ``expand`` is ``False``. Because we are not
creating new columns at construction time dtype can be `str`.
"""
return self._split_helper(
rsplit=False, pat=pat, expand=expand, regex=regex, **kwargs)

@frame_base.with_docs_from(pd.core.strings.StringMethods)
@frame_base.args_to_kwargs(pd.core.strings.StringMethods)
def rsplit(self, pat=None, expand=False, **kwargs):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rsplit API does not support regex as an argument even though its documentation does. This is probably not a bug, but rather documentation inconsistency. I can file an issue w/ pandas.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah let's file an issue with pandas about this, if you want to get a commit in pandas I think it's a valid use of your time to contribute a fix too. If not I'm sure someone will pick it up, it's a very active community.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it was filed a while time ago, but no one has picked it up. I'll assign it to myself to do. In the meantime, I can add a subtask (under the non-deferred ticket?) to remind us to update the function api once the rsplit fix in pandas is released.

"""
Like other non-deferred methods, dtype must be CategoricalDtype.
One exception is when ``expand`` is ``False``. Because we are not
creating new columns at construction time dtype can be `str`.
"""
return self._split_helper(rsplit=True, pat=pat, expand=expand, **kwargs)


ELEMENTWISE_STRING_METHODS = [
Expand Down
112 changes: 112 additions & 0 deletions sdks/python/apache_beam/dataframe/frames_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import numpy as np
import pandas as pd
import re
from parameterized import parameterized

import apache_beam as beam
Expand Down Expand Up @@ -2247,6 +2248,117 @@ def test_sample_with_weights_distribution(self):
expected = num_samples * target_prob
self.assertTrue(expected / 3 < result < expected * 2, (expected, result))

def test_split_pandas_examples_no_expand(self):
# if expand=False (default), then no need to cast dtype to be
# CategoricalDtype.
s = pd.Series([
"this is a regular sentence",
"https://docs.python.org/3/tutorial/index.html",
np.nan
])
result = self._evaluate(lambda s: s.str.split(), s)
self.assert_frame_data_equivalent(result, s.str.split())

result = self._evaluate(lambda s: s.str.rsplit(), s)
self.assert_frame_data_equivalent(result, s.str.rsplit())

result = self._evaluate(lambda s: s.str.split(n=2), s)
self.assert_frame_data_equivalent(result, s.str.split(n=2))

result = self._evaluate(lambda s: s.str.rsplit(n=2), s)
self.assert_frame_data_equivalent(result, s.str.rsplit(n=2))

result = self._evaluate(lambda s: s.str.split(pat="/"), s)
self.assert_frame_data_equivalent(result, s.str.split(pat="/"))

def test_split_pandas_examples_expand_not_categorical(self):
# When expand=True, there is exception because series is not categorical
s = pd.Series([
"this is a regular sentence",
"https://docs.python.org/3/tutorial/index.html",
np.nan
])
with self.assertRaisesRegex(
frame_base.WontImplementError,
r"split\(\) of non-categorical type is not supported"):
self._evaluate(lambda s: s.str.split(expand=True), s)

with self.assertRaisesRegex(
frame_base.WontImplementError,
r"rsplit\(\) of non-categorical type is not supported"):
self._evaluate(lambda s: s.str.rsplit(expand=True), s)

def test_split_pandas_examples_expand_pat_is_string_literal1(self):
# When expand=True and pattern is treated as a string literal
s = pd.Series([
"this is a regular sentence",
"https://docs.python.org/3/tutorial/index.html",
np.nan
])
s = s.astype(
pd.CategoricalDtype(
categories=[
'this is a regular sentence',
'https://docs.python.org/3/tutorial/index.html'
]))
result = self._evaluate(lambda s: s.str.split(expand=True), s)
self.assert_frame_data_equivalent(result, s.str.split(expand=True))

result = self._evaluate(lambda s: s.str.rsplit("/", n=1, expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.rsplit("/", n=1, expand=True))

@unittest.skipIf(PD_VERSION < (1, 4), "regex arg is new in pandas 1.4")
def test_split_pandas_examples_expand_pat_is_string_literal2(self):
# when regex is None (default) regex pat is string literal if len(pat) == 1
s = pd.Series(['foojpgbar.jpg']).astype('category')
s = s.astype(pd.CategoricalDtype(categories=["foojpgbar.jpg"]))
result = self._evaluate(lambda s: s.str.split(r".", expand=True), s)
self.assert_frame_data_equivalent(result, s.str.split(r".", expand=True))

# When regex=False, pat is interpreted as the string itself
result = self._evaluate(
lambda s: s.str.split(r"\.jpg", regex=False, expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"\.jpg", regex=False, expand=True))

@unittest.skipIf(PD_VERSION < (1, 4), "regex arg is new in pandas 1.4")
def test_split_pandas_examples_expand_pat_is_regex(self):
# when regex is None (default) regex pat is compiled if len(pat) != 1
s = pd.Series(["foo and bar plus baz"])
s = s.astype(pd.CategoricalDtype(categories=["foo and bar plus baz"]))
result = self._evaluate(lambda s: s.str.split(r"and|plus", expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"and|plus", expand=True))

s = pd.Series(['foojpgbar.jpg']).astype('category')
s = s.astype(pd.CategoricalDtype(categories=["foojpgbar.jpg"]))
result = self._evaluate(lambda s: s.str.split(r"\.jpg", expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"\.jpg", expand=True))

# When regex=True, pat is interpreted as a regex
result = self._evaluate(
lambda s: s.str.split(r"\.jpg", regex=True, expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"\.jpg", regex=True, expand=True))

# A compiled regex can be passed as pat
result = self._evaluate(
lambda s: s.str.split(re.compile(r"\.jpg"), expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(re.compile(r"\.jpg"), expand=True))

@unittest.skipIf(PD_VERSION < (1, 4), "regex arg is new in pandas 1.4")
def test_split_pat_is_regex(self):
# My own one: regex, but expand=False
s = pd.Series(['foojpgbar.jpg']).astype('category')
s = s.astype(pd.CategoricalDtype(categories=["foojpgbar.jpg"]))
result = self._evaluate(
lambda s: s.str.split(r"\.jpg", regex=True, expand=False), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"\.jpg", regex=True, expand=False))


class AllowNonParallelTest(unittest.TestCase):
def _use_non_parallel_operation(self):
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/dataframe/transforms_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def check_correct(expected, actual):


def concat(parts):
# import pdb; pdb.set_trace()
if len(parts) > 1:
return pd.concat(parts)
elif len(parts) == 1:
Expand Down Expand Up @@ -81,7 +82,7 @@ def run_scenario(self, input, func):
input_deferred = frame_base.DeferredFrame.wrap(input_placeholder)
actual_deferred = func(input_deferred)._expr.evaluate_at(
expressions.Session({input_placeholder: input}))

# import pdb; pdb.set_trace()
check_correct(expected, actual_deferred)

with beam.Pipeline() as p:
Expand Down