Skip to content

Commit

Permalink
Support args= in Series.apply (#9982)
Browse files Browse the repository at this point in the history
Closes #9598

A lot of code was moved around but also slightly tweaked, making the diff a little harder to parse. Here's a summary of the changes:

- `Series.apply` used to simply turn the incoming scalar lambda function into a row UDF and then turn itself into a dataframe and run the code as normal. Now, it does its own separate unique processing and pipes through `Frame._apply` instead.
- `pipeline.py` was separated out into `row_function.py` and `lambda_function.py` which contain whatever is specific to each type of UDF, whereas everything that was common to both was migrated to `utils.py` and generalized as much as possible.
- a `templates.py` area was created to hold all the templates and initializers needed to cat together the kernel that we need and a new template specific to series lambdas was created.
- The caching machinery was abstracted out into `compile_or_get` and this function now expects a python function object it can call that will produce the right kernel. `DataFrame` and `Series` decide which one to use at the top level API. 
- Moved `_apply` from `Frame` to `IndexedFrame`

Authors:
  - https://github.com/brandon-b-miller

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Michael Wang (https://github.com/isVoid)

URL: #9982
  • Loading branch information
brandon-b-miller authored Jan 28, 2022
1 parent 5257f34 commit 896564a
Show file tree
Hide file tree
Showing 10 changed files with 578 additions and 447 deletions.
5 changes: 2 additions & 3 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from cudf.core.multiindex import MultiIndex
from cudf.core.resample import DataFrameResampler
from cudf.core.series import Series
from cudf.core.udf.row_function import _get_row_kernel
from cudf.utils import applyutils, docutils, ioutils, queryutils, utils
from cudf.utils.docutils import copy_docstring
from cudf.utils.dtypes import (
Expand Down Expand Up @@ -3926,10 +3927,8 @@ def apply(
raise ValueError("The `raw` kwarg is not yet supported.")
if result_type is not None:
raise ValueError("The `result_type` kwarg is not yet supported.")
if kwargs:
raise ValueError("UDFs using **kwargs are not yet supported.")

return self._apply(func, *args)
return self._apply(func, _get_row_kernel, *args, **kwargs)

@applyutils.doc_apply()
def apply_rows(
Expand Down
34 changes: 0 additions & 34 deletions python/cudf/cudf/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
)
from cudf.core.column_accessor import ColumnAccessor
from cudf.core.join import Merge, MergeSemi
from cudf.core.udf.pipeline import compile_or_get, supported_cols_from_frame
from cudf.core.window import Rolling
from cudf.utils import ioutils
from cudf.utils.docutils import copy_docstring
Expand Down Expand Up @@ -1367,39 +1366,6 @@ def _quantiles(
result._copy_type_metadata(self)
return result

@annotate("APPLY", color="purple", domain="cudf_python")
def _apply(self, func, *args):
"""
Apply `func` across the rows of the frame.
"""
kernel, retty = compile_or_get(self, func, args)

# Mask and data column preallocated
ans_col = cupy.empty(len(self), dtype=retty)
ans_mask = cudf.core.column.column_empty(len(self), dtype="bool")
launch_args = [(ans_col, ans_mask), len(self)]
offsets = []

# if compile_or_get succeeds, it is safe to create a kernel that only
# consumes the columns that are of supported dtype
for col in supported_cols_from_frame(self).values():
data = col.data
mask = col.mask
if mask is None:
launch_args.append(data)
else:
launch_args.append((data, mask))
offsets.append(col.offset)
launch_args += offsets
launch_args += list(args)
kernel.forall(len(self))(*launch_args)

col = as_column(ans_col)
col.set_base_mask(libcudf.transform.bools_to_mask(ans_mask))
result = cudf.Series._from_data({None: col}, self._index)

return result

def rank(
self,
axis=0,
Expand Down
48 changes: 47 additions & 1 deletion python/cudf/cudf/core/indexed_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@
is_integer_dtype,
is_list_like,
)
from cudf.core.column import arange
from cudf.core.column import arange, as_column
from cudf.core.column_accessor import ColumnAccessor
from cudf.core.frame import Frame
from cudf.core.index import Index, RangeIndex, _index_from_columns
from cudf.core.multiindex import MultiIndex
from cudf.core.udf.utils import _compile_or_get, _supported_cols_from_frame
from cudf.utils.utils import cached_property

doc_reset_index_template = """
Expand Down Expand Up @@ -756,6 +757,51 @@ def add_suffix(self, suffix):
Use `Series.add_suffix` or `DataFrame.add_suffix`"
)

@annotate("APPLY", color="purple", domain="cudf_python")
def _apply(self, func, kernel_getter, *args, **kwargs):
"""Apply `func` across the rows of the frame."""
if kwargs:
raise ValueError("UDFs using **kwargs are not yet supported.")

try:
kernel, retty = _compile_or_get(
self, func, args, kernel_getter=kernel_getter
)
except Exception as e:
raise ValueError(
"user defined function compilation failed."
) from e

# Mask and data column preallocated
ans_col = cp.empty(len(self), dtype=retty)
ans_mask = cudf.core.column.column_empty(len(self), dtype="bool")
launch_args = [(ans_col, ans_mask), len(self)]
offsets = []

# if _compile_or_get succeeds, it is safe to create a kernel that only
# consumes the columns that are of supported dtype
for col in _supported_cols_from_frame(self).values():
data = col.data
mask = col.mask
if mask is None:
launch_args.append(data)
else:
launch_args.append((data, mask))
offsets.append(col.offset)
launch_args += offsets
launch_args += list(args)

try:
kernel.forall(len(self))(*launch_args)
except Exception as e:
raise RuntimeError("UDF kernel execution failed.") from e

col = as_column(ans_col)
col.set_base_mask(libcudf.transform.bools_to_mask(ans_mask))
result = cudf.Series._from_data({None: col}, self._index)

return result

def sort_values(
self,
by,
Expand Down
21 changes: 5 additions & 16 deletions python/cudf/cudf/core/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import cupy
import numpy as np
import pandas as pd
from numba import cuda
from pandas._config import get_option

import cudf
Expand Down Expand Up @@ -67,6 +66,7 @@
doc_reset_index_template,
)
from cudf.core.single_column_frame import SingleColumnFrame
from cudf.core.udf.scalar_function import _get_scalar_kernel
from cudf.utils import cudautils, docutils
from cudf.utils.docutils import copy_docstring
from cudf.utils.dtypes import (
Expand Down Expand Up @@ -2374,7 +2374,7 @@ def apply(self, func, convert_dtype=True, args=(), **kwargs):
by numba based on the function logic and argument types.
See examples for details.
args : tuple
Not supported
Positional arguments passed to func after the series value.
**kwargs
Not supported
Expand Down Expand Up @@ -2440,20 +2440,9 @@ def apply(self, func, convert_dtype=True, args=(), **kwargs):
2 4.5
dtype: float64
"""
if args or kwargs:
raise ValueError(
"UDFs using *args or **kwargs are not yet supported."
)

# these functions are generally written as functions of scalar
# values rather than rows. Rather than writing an entirely separate
# numba kernel that is not built around a row object, its simpler
# to just turn this into the equivalent single column dataframe case
name = self.name or "__temp_srname"
df = cudf.DataFrame({name: self})
f_ = cuda.jit(device=True)(func)

return df.apply(lambda row: f_(row[name]))
if convert_dtype is not True:
raise ValueError("Series.apply only supports convert_dtype=True")
return self._apply(func, _get_scalar_kernel, *args, **kwargs)

def applymap(self, udf, out_dtype=None):
"""Apply an elementwise function to transform the values in the Column.
Expand Down
Loading

0 comments on commit 896564a

Please sign in to comment.