Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support args= in apply #9514

Merged
merged 7 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4091,7 +4091,7 @@ def apply(
result_type: {'expand', 'reduce', 'broadcast', None}, default None
Not yet supported
args: tuple
Not yet supported
Positional arguments to pass to func in addition to the dataframe.

Examples
--------
Expand Down Expand Up @@ -4238,10 +4238,10 @@ def apply(
raise ValueError("The `raw` kwarg is not yet supported.")
if result_type is not None:
raise ValueError("The `result_type` kwarg is not yet supported.")
if args or kwargs:
raise ValueError("args and kwargs are not yet supported.")
if kwargs:
raise ValueError("UDFs using **kwargs are not yet supported.")

return self._apply(func)
return self._apply(func, *args)

@applyutils.doc_apply()
def apply_rows(
Expand Down
8 changes: 4 additions & 4 deletions python/cudf/cudf/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1553,16 +1553,16 @@ def _quantiles(
return result

@annotate("APPLY", color="purple", domain="cudf_python")
def _apply(self, func):
def _apply(self, func, *args):
"""
Apply `func` across the rows of the frame.
"""
kernel, retty = compile_or_get(self, func)
kernel, retty = compile_or_get(self, func, args)

# Mask and data column preallocated
ans_col = cupy.empty(len(self), dtype=retty)
ans_mask = cudf.core.column.column_empty(len(self), dtype="bool")
launch_args = [(ans_col, ans_mask)]
launch_args = [(ans_col, ans_mask), len(self)]
offsets = []
for col in self._data.values():
data = col.data
Expand All @@ -1573,7 +1573,7 @@ def _apply(self, func):
launch_args.append((data, mask))
offsets.append(col.offset)
launch_args += offsets
launch_args.append(len(self)) # size
launch_args += list(args)
kernel.forall(len(self))(*launch_args)

result = cudf.Series(ans_col).set_mask(
Expand Down
33 changes: 18 additions & 15 deletions python/cudf/cudf/core/udf/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import cachetools
import numpy as np
from numba import cuda
from numba import cuda, typeof
from numba.np import numpy_support
from numba.types import Record, Tuple, boolean, int64, void
from nvtx import annotate
Expand Down Expand Up @@ -66,17 +66,19 @@ def get_frame_row_type(fr):


@annotate("NUMBA JIT", color="green", domain="cudf_python")
def get_udf_return_type(func, df):
def get_udf_return_type(func, df, args=()):
"""
Get the return type of a masked UDF for a given set of argument dtypes. It
is assumed that a `MaskedType(dtype)` is passed to the function for each
input dtype.
"""
# The users function args should be a row of the frame and then extra args
row_type = get_frame_row_type(df)
compile_sig = (row_type, *(typeof(arg) for arg in args))

# Get the return type. The PTX is also returned by compile_udf, but is not
# needed here.
ptx, output_type = cudautils.compile_udf(func, (row_type,))
ptx, output_type = cudautils.compile_udf(func, compile_sig)
if not isinstance(output_type, MaskedType):
numba_output_type = numpy_support.from_dtype(np.dtype(output_type))
else:
Expand All @@ -99,7 +101,7 @@ def masked_array_type_from_col(col):
return Tuple((nb_scalar_ty[::1], libcudf_bitmask_type[::1]))


def construct_signature(df, return_type):
def construct_signature(df, return_type, args):
"""
Build the signature of numba types that will be used to
actually JIT the kernel itself later, accounting for types
Expand All @@ -109,13 +111,13 @@ def construct_signature(df, return_type):
# Tuple of arrays, first the output data array, then the mask
return_type = Tuple((return_type[::1], boolean[::1]))
offsets = []
sig = [return_type]
sig = [return_type, int64]
for col in df._data.values():
sig.append(masked_array_type_from_col(col))
offsets.append(int64)

# return_type + data,masks + offsets + size
sig = void(*(sig + offsets + [int64]))
# return_type, size, data, masks, offsets, extra args
sig = void(*(sig + offsets + [typeof(arg) for arg in args]))

return sig

Expand All @@ -126,7 +128,7 @@ def mask_get(mask, pos):


kernel_template = """\
def _kernel(retval, {input_columns}, {input_offsets}, size):
def _kernel(retval, size, {input_columns}, {input_offsets}, {extra_args}):
i = cuda.grid(1)
ret_data_arr, ret_mask_arr = retval
if i < size:
Expand All @@ -140,7 +142,7 @@ def _kernel(retval, {input_columns}, {input_offsets}, size):
{row_initializers}

# pass the assembled row into the udf
ret = f_(row)
ret = f_(row, {extra_args})

# pack up the return values and set them
ret_masked = pack_return(ret)
Expand All @@ -163,7 +165,7 @@ def _kernel(retval, {input_columns}, {input_offsets}, size):
"""


def _define_function(fr, row_type, scalar_return=False):
def _define_function(fr, row_type, args, scalar_return=False):
"""
The kernel we want to JIT compile looks something like the following,
which is an example for two columns that both have nulls present
Expand Down Expand Up @@ -199,6 +201,7 @@ def _kernel(retval, input_col_0, input_col_1, offset_0, offset_1, size):
# Create argument list for kernel
input_columns = ", ".join([f"input_col_{i}" for i in range(len(fr._data))])
input_offsets = ", ".join([f"offset_{i}" for i in range(len(fr._data))])
extra_args = ", ".join([f"extra_arg_{i}" for i in range(len(args))])

# Generate the initializers for each device function argument
initializers = []
Expand Down Expand Up @@ -226,6 +229,7 @@ def _kernel(retval, input_col_0, input_col_1, offset_0, offset_1, size):
d = {
"input_columns": input_columns,
"input_offsets": input_offsets,
"extra_args": extra_args,
"masked_input_initializers": masked_input_initializers,
"row_initializers": row_initializers,
"numba_rectype": row_type, # from global
Expand All @@ -235,7 +239,7 @@ def _kernel(retval, input_col_0, input_col_1, offset_0, offset_1, size):


@annotate("UDF COMPILATION", color="darkgreen", domain="cudf_python")
def compile_or_get(df, f):
def compile_or_get(df, f, args):
"""
Return a compiled kernel in terms of MaskedTypes that launches a
kernel equivalent of `f` for the dtypes of `df`. The kernel uses
Expand Down Expand Up @@ -270,7 +274,7 @@ def compile_or_get(df, f):

# precompile the user udf to get the right return type.
# could be a MaskedType or a scalar type.
numba_return_type = get_udf_return_type(f, df)
numba_return_type = get_udf_return_type(f, df, args)

_is_scalar_return = not isinstance(numba_return_type, MaskedType)
scalar_return_type = (
Expand All @@ -280,8 +284,7 @@ def compile_or_get(df, f):
)

# this is the signature for the final full kernel compilation
sig = construct_signature(df, scalar_return_type)

sig = construct_signature(df, scalar_return_type, args)
# this row type is used within the kernel to pack up the column and
# mask data into the dict like data structure the user udf expects
row_type = get_frame_row_type(df)
Expand All @@ -298,7 +301,7 @@ def compile_or_get(df, f):
"row_type": row_type,
}
exec(
_define_function(df, row_type, scalar_return=_is_scalar_return),
_define_function(df, row_type, args, scalar_return=_is_scalar_return),
global_exec_context,
local_exec_context,
)
Expand Down
53 changes: 47 additions & 6 deletions python/cudf/cudf/tests/test_udf_masked_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@
from cudf.testing._utils import NUMERIC_TYPES, assert_eq


def run_masked_udf_test(func_pdf, func_gdf, data, **kwargs):
def run_masked_udf_test(func_pdf, func_gdf, data, args=(), **kwargs):
gdf = data
pdf = data.to_pandas(nullable=True)

expect = pdf.apply(func_pdf, axis=1)
obtain = gdf.apply(func_gdf, axis=1)
expect = pdf.apply(func_pdf, args=args, axis=1)
obtain = gdf.apply(func_gdf, args=args, axis=1)
assert_eq(expect, obtain, **kwargs)


def run_masked_udf_series(func_psr, func_gsr, data, **kwargs):
def run_masked_udf_series(func_psr, func_gsr, data, args=(), **kwargs):
gsr = data
psr = data.to_pandas(nullable=True)

expect = psr.apply(func_psr)
obtain = gsr.apply(func_gsr)
expect = psr.apply(func_psr, args=args)
obtain = gsr.apply(func_gsr, args=args)
assert_eq(expect, obtain, **kwargs)


Expand Down Expand Up @@ -586,3 +586,44 @@ def func(row):

data = cudf.DataFrame(data)
run_masked_udf_test(func, func, data)


# tests for `DataFrame.apply(f, args=(x,y,z))`
# testing the whole space of possibilities is intractable
# these test the most rudimentary guaranteed functionality
@pytest.mark.parametrize(
"data",
[
{"a": [1, cudf.NA, 3]},
{"a": [0.5, 2.0, cudf.NA, cudf.NA, 5.0]},
{"a": [True, False, cudf.NA]},
],
)
@pytest.mark.parametrize("op", arith_ops + comparison_ops)
def test_masked_udf_scalar_args_binops(data, op):
data = cudf.DataFrame(data)

def func(row, c):
return op(row["a"], c)

run_masked_udf_test(func, func, data, args=(1,), check_dtype=False)


@pytest.mark.parametrize(
"data",
[
{"a": [1, cudf.NA, 3]},
{"a": [0.5, 2.0, cudf.NA, cudf.NA, 5.0]},
{"a": [True, False, cudf.NA]},
],
)
@pytest.mark.parametrize("op", arith_ops + comparison_ops)
def test_masked_udf_scalar_args_binops_multiple(data, op):
data = cudf.DataFrame(data)

def func(row, c, k):
x = op(row["a"], c)
y = op(x, k)
return y

run_masked_udf_test(func, func, data, args=(1, 2), check_dtype=False)