Skip to content

Commit

Permalink
Support args= in apply (#9514)
Browse files Browse the repository at this point in the history
Closes #9500

Allows passing `args=` to `DataFrame.apply` as is supported in pandas: https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.apply.html

Concretely, this allows for this:

```python
import cudf
df = cudf.DataFrame({
    'a':[1,2,3]
})

def f(row, c):
    return row['a'] + c

res = df.apply(f, args=(3,))
```

cc @randerzander

Authors:
  - https://github.com/brandon-b-miller

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: #9514
  • Loading branch information
brandon-b-miller authored Nov 4, 2021
1 parent 9d375f5 commit f041a47
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 29 deletions.
8 changes: 4 additions & 4 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3924,7 +3924,7 @@ def apply(
result_type: {'expand', 'reduce', 'broadcast', None}, default None
Not yet supported
args: tuple
Not yet supported
Positional arguments to pass to func in addition to the dataframe.
Examples
--------
Expand Down Expand Up @@ -4071,10 +4071,10 @@ def apply(
raise ValueError("The `raw` kwarg is not yet supported.")
if result_type is not None:
raise ValueError("The `result_type` kwarg is not yet supported.")
if args or kwargs:
raise ValueError("args and kwargs are not yet supported.")
if kwargs:
raise ValueError("UDFs using **kwargs are not yet supported.")

return self._apply(func)
return self._apply(func, *args)

@applyutils.doc_apply()
def apply_rows(
Expand Down
8 changes: 4 additions & 4 deletions python/cudf/cudf/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1559,16 +1559,16 @@ def _quantiles(
return result

@annotate("APPLY", color="purple", domain="cudf_python")
def _apply(self, func):
def _apply(self, func, *args):
"""
Apply `func` across the rows of the frame.
"""
kernel, retty = compile_or_get(self, func)
kernel, retty = compile_or_get(self, func, args)

# Mask and data column preallocated
ans_col = cupy.empty(len(self), dtype=retty)
ans_mask = cudf.core.column.column_empty(len(self), dtype="bool")
launch_args = [(ans_col, ans_mask)]
launch_args = [(ans_col, ans_mask), len(self)]
offsets = []
for col in self._data.values():
data = col.data
Expand All @@ -1579,7 +1579,7 @@ def _apply(self, func):
launch_args.append((data, mask))
offsets.append(col.offset)
launch_args += offsets
launch_args.append(len(self)) # size
launch_args += list(args)
kernel.forall(len(self))(*launch_args)

result = cudf.Series(ans_col).set_mask(
Expand Down
33 changes: 18 additions & 15 deletions python/cudf/cudf/core/udf/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import cachetools
import numpy as np
from numba import cuda
from numba import cuda, typeof
from numba.np import numpy_support
from numba.types import Record, Tuple, boolean, int64, void
from nvtx import annotate
Expand Down Expand Up @@ -66,17 +66,19 @@ def get_frame_row_type(fr):


@annotate("NUMBA JIT", color="green", domain="cudf_python")
def get_udf_return_type(func, df):
def get_udf_return_type(func, df, args=()):
"""
Get the return type of a masked UDF for a given set of argument dtypes. It
is assumed that a `MaskedType(dtype)` is passed to the function for each
input dtype.
"""
# The users function args should be a row of the frame and then extra args
row_type = get_frame_row_type(df)
compile_sig = (row_type, *(typeof(arg) for arg in args))

# Get the return type. The PTX is also returned by compile_udf, but is not
# needed here.
ptx, output_type = cudautils.compile_udf(func, (row_type,))
ptx, output_type = cudautils.compile_udf(func, compile_sig)
if not isinstance(output_type, MaskedType):
numba_output_type = numpy_support.from_dtype(np.dtype(output_type))
else:
Expand All @@ -99,7 +101,7 @@ def masked_array_type_from_col(col):
return Tuple((nb_scalar_ty[::1], libcudf_bitmask_type[::1]))


def construct_signature(df, return_type):
def construct_signature(df, return_type, args):
"""
Build the signature of numba types that will be used to
actually JIT the kernel itself later, accounting for types
Expand All @@ -109,13 +111,13 @@ def construct_signature(df, return_type):
# Tuple of arrays, first the output data array, then the mask
return_type = Tuple((return_type[::1], boolean[::1]))
offsets = []
sig = [return_type]
sig = [return_type, int64]
for col in df._data.values():
sig.append(masked_array_type_from_col(col))
offsets.append(int64)

# return_type + data,masks + offsets + size
sig = void(*(sig + offsets + [int64]))
# return_type, size, data, masks, offsets, extra args
sig = void(*(sig + offsets + [typeof(arg) for arg in args]))

return sig

Expand All @@ -126,7 +128,7 @@ def mask_get(mask, pos):


kernel_template = """\
def _kernel(retval, {input_columns}, {input_offsets}, size):
def _kernel(retval, size, {input_columns}, {input_offsets}, {extra_args}):
i = cuda.grid(1)
ret_data_arr, ret_mask_arr = retval
if i < size:
Expand All @@ -140,7 +142,7 @@ def _kernel(retval, {input_columns}, {input_offsets}, size):
{row_initializers}
# pass the assembled row into the udf
ret = f_(row)
ret = f_(row, {extra_args})
# pack up the return values and set them
ret_masked = pack_return(ret)
Expand All @@ -163,7 +165,7 @@ def _kernel(retval, {input_columns}, {input_offsets}, size):
"""


def _define_function(fr, row_type, scalar_return=False):
def _define_function(fr, row_type, args, scalar_return=False):
"""
The kernel we want to JIT compile looks something like the following,
which is an example for two columns that both have nulls present
Expand Down Expand Up @@ -199,6 +201,7 @@ def _kernel(retval, input_col_0, input_col_1, offset_0, offset_1, size):
# Create argument list for kernel
input_columns = ", ".join([f"input_col_{i}" for i in range(len(fr._data))])
input_offsets = ", ".join([f"offset_{i}" for i in range(len(fr._data))])
extra_args = ", ".join([f"extra_arg_{i}" for i in range(len(args))])

# Generate the initializers for each device function argument
initializers = []
Expand Down Expand Up @@ -226,6 +229,7 @@ def _kernel(retval, input_col_0, input_col_1, offset_0, offset_1, size):
d = {
"input_columns": input_columns,
"input_offsets": input_offsets,
"extra_args": extra_args,
"masked_input_initializers": masked_input_initializers,
"row_initializers": row_initializers,
"numba_rectype": row_type, # from global
Expand All @@ -235,7 +239,7 @@ def _kernel(retval, input_col_0, input_col_1, offset_0, offset_1, size):


@annotate("UDF COMPILATION", color="darkgreen", domain="cudf_python")
def compile_or_get(df, f):
def compile_or_get(df, f, args):
"""
Return a compiled kernel in terms of MaskedTypes that launches a
kernel equivalent of `f` for the dtypes of `df`. The kernel uses
Expand Down Expand Up @@ -270,7 +274,7 @@ def compile_or_get(df, f):

# precompile the user udf to get the right return type.
# could be a MaskedType or a scalar type.
numba_return_type = get_udf_return_type(f, df)
numba_return_type = get_udf_return_type(f, df, args)

_is_scalar_return = not isinstance(numba_return_type, MaskedType)
scalar_return_type = (
Expand All @@ -280,8 +284,7 @@ def compile_or_get(df, f):
)

# this is the signature for the final full kernel compilation
sig = construct_signature(df, scalar_return_type)

sig = construct_signature(df, scalar_return_type, args)
# this row type is used within the kernel to pack up the column and
# mask data into the dict like data structure the user udf expects
row_type = get_frame_row_type(df)
Expand All @@ -298,7 +301,7 @@ def compile_or_get(df, f):
"row_type": row_type,
}
exec(
_define_function(df, row_type, scalar_return=_is_scalar_return),
_define_function(df, row_type, args, scalar_return=_is_scalar_return),
global_exec_context,
local_exec_context,
)
Expand Down
53 changes: 47 additions & 6 deletions python/cudf/cudf/tests/test_udf_masked_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@
from cudf.testing._utils import NUMERIC_TYPES, assert_eq


def run_masked_udf_test(func_pdf, func_gdf, data, **kwargs):
def run_masked_udf_test(func_pdf, func_gdf, data, args=(), **kwargs):
gdf = data
pdf = data.to_pandas(nullable=True)

expect = pdf.apply(func_pdf, axis=1)
obtain = gdf.apply(func_gdf, axis=1)
expect = pdf.apply(func_pdf, args=args, axis=1)
obtain = gdf.apply(func_gdf, args=args, axis=1)
assert_eq(expect, obtain, **kwargs)


def run_masked_udf_series(func_psr, func_gsr, data, **kwargs):
def run_masked_udf_series(func_psr, func_gsr, data, args=(), **kwargs):
gsr = data
psr = data.to_pandas(nullable=True)

expect = psr.apply(func_psr)
obtain = gsr.apply(func_gsr)
expect = psr.apply(func_psr, args=args)
obtain = gsr.apply(func_gsr, args=args)
assert_eq(expect, obtain, **kwargs)


Expand Down Expand Up @@ -586,3 +586,44 @@ def func(row):

data = cudf.DataFrame(data)
run_masked_udf_test(func, func, data)


# tests for `DataFrame.apply(f, args=(x,y,z))`
# testing the whole space of possibilities is intractable
# these test the most rudimentary guaranteed functionality
@pytest.mark.parametrize(
"data",
[
{"a": [1, cudf.NA, 3]},
{"a": [0.5, 2.0, cudf.NA, cudf.NA, 5.0]},
{"a": [True, False, cudf.NA]},
],
)
@pytest.mark.parametrize("op", arith_ops + comparison_ops)
def test_masked_udf_scalar_args_binops(data, op):
data = cudf.DataFrame(data)

def func(row, c):
return op(row["a"], c)

run_masked_udf_test(func, func, data, args=(1,), check_dtype=False)


@pytest.mark.parametrize(
"data",
[
{"a": [1, cudf.NA, 3]},
{"a": [0.5, 2.0, cudf.NA, cudf.NA, 5.0]},
{"a": [True, False, cudf.NA]},
],
)
@pytest.mark.parametrize("op", arith_ops + comparison_ops)
def test_masked_udf_scalar_args_binops_multiple(data, op):
data = cudf.DataFrame(data)

def func(row, c, k):
x = op(row["a"], c)
y = op(x, k)
return y

run_masked_udf_test(func, func, data, args=(1, 2), check_dtype=False)

0 comments on commit f041a47

Please sign in to comment.