From f041a47954cd3dd7498589dd190b0eac6b0fa169 Mon Sep 17 00:00:00 2001 From: brandon-b-miller <53796099+brandon-b-miller@users.noreply.github.com> Date: Thu, 4 Nov 2021 08:39:27 -0500 Subject: [PATCH] Support `args=` in `apply` (#9514) Closes https://github.com/rapidsai/cudf/issues/9500 Allows passing `args=` to `DataFrame.apply` as is supported in pandas: https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.apply.html Concretely, this allows for this: ```python import cudf df = cudf.DataFrame({ 'a':[1,2,3] }) def f(row, c): return row['a'] + c res = df.apply(f, args=(3,)) ``` cc @randerzander Authors: - https://github.com/brandon-b-miller Approvers: - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/9514 --- python/cudf/cudf/core/dataframe.py | 8 +-- python/cudf/cudf/core/frame.py | 8 +-- python/cudf/cudf/core/udf/pipeline.py | 33 ++++++------ python/cudf/cudf/tests/test_udf_masked_ops.py | 53 ++++++++++++++++--- 4 files changed, 73 insertions(+), 29 deletions(-) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 42389ef6e4b..464357060e6 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -3924,7 +3924,7 @@ def apply( result_type: {'expand', 'reduce', 'broadcast', None}, default None Not yet supported args: tuple - Not yet supported + Positional arguments to pass to func in addition to the dataframe. Examples -------- @@ -4071,10 +4071,10 @@ def apply( raise ValueError("The `raw` kwarg is not yet supported.") if result_type is not None: raise ValueError("The `result_type` kwarg is not yet supported.") - if args or kwargs: - raise ValueError("args and kwargs are not yet supported.") + if kwargs: + raise ValueError("UDFs using **kwargs are not yet supported.") - return self._apply(func) + return self._apply(func, *args) @applyutils.doc_apply() def apply_rows( diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index 2c469a4ea6a..319990c0111 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -1559,16 +1559,16 @@ def _quantiles( return result @annotate("APPLY", color="purple", domain="cudf_python") - def _apply(self, func): + def _apply(self, func, *args): """ Apply `func` across the rows of the frame. """ - kernel, retty = compile_or_get(self, func) + kernel, retty = compile_or_get(self, func, args) # Mask and data column preallocated ans_col = cupy.empty(len(self), dtype=retty) ans_mask = cudf.core.column.column_empty(len(self), dtype="bool") - launch_args = [(ans_col, ans_mask)] + launch_args = [(ans_col, ans_mask), len(self)] offsets = [] for col in self._data.values(): data = col.data @@ -1579,7 +1579,7 @@ def _apply(self, func): launch_args.append((data, mask)) offsets.append(col.offset) launch_args += offsets - launch_args.append(len(self)) # size + launch_args += list(args) kernel.forall(len(self))(*launch_args) result = cudf.Series(ans_col).set_mask( diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index b52668fcd05..6d9ae86d0fa 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -2,7 +2,7 @@ import cachetools import numpy as np -from numba import cuda +from numba import cuda, typeof from numba.np import numpy_support from numba.types import Record, Tuple, boolean, int64, void from nvtx import annotate @@ -66,17 +66,19 @@ def get_frame_row_type(fr): @annotate("NUMBA JIT", color="green", domain="cudf_python") -def get_udf_return_type(func, df): +def get_udf_return_type(func, df, args=()): """ Get the return type of a masked UDF for a given set of argument dtypes. It is assumed that a `MaskedType(dtype)` is passed to the function for each input dtype. """ + # The users function args should be a row of the frame and then extra args row_type = get_frame_row_type(df) + compile_sig = (row_type, *(typeof(arg) for arg in args)) # Get the return type. The PTX is also returned by compile_udf, but is not # needed here. - ptx, output_type = cudautils.compile_udf(func, (row_type,)) + ptx, output_type = cudautils.compile_udf(func, compile_sig) if not isinstance(output_type, MaskedType): numba_output_type = numpy_support.from_dtype(np.dtype(output_type)) else: @@ -99,7 +101,7 @@ def masked_array_type_from_col(col): return Tuple((nb_scalar_ty[::1], libcudf_bitmask_type[::1])) -def construct_signature(df, return_type): +def construct_signature(df, return_type, args): """ Build the signature of numba types that will be used to actually JIT the kernel itself later, accounting for types @@ -109,13 +111,13 @@ def construct_signature(df, return_type): # Tuple of arrays, first the output data array, then the mask return_type = Tuple((return_type[::1], boolean[::1])) offsets = [] - sig = [return_type] + sig = [return_type, int64] for col in df._data.values(): sig.append(masked_array_type_from_col(col)) offsets.append(int64) - # return_type + data,masks + offsets + size - sig = void(*(sig + offsets + [int64])) + # return_type, size, data, masks, offsets, extra args + sig = void(*(sig + offsets + [typeof(arg) for arg in args])) return sig @@ -126,7 +128,7 @@ def mask_get(mask, pos): kernel_template = """\ -def _kernel(retval, {input_columns}, {input_offsets}, size): +def _kernel(retval, size, {input_columns}, {input_offsets}, {extra_args}): i = cuda.grid(1) ret_data_arr, ret_mask_arr = retval if i < size: @@ -140,7 +142,7 @@ def _kernel(retval, {input_columns}, {input_offsets}, size): {row_initializers} # pass the assembled row into the udf - ret = f_(row) + ret = f_(row, {extra_args}) # pack up the return values and set them ret_masked = pack_return(ret) @@ -163,7 +165,7 @@ def _kernel(retval, {input_columns}, {input_offsets}, size): """ -def _define_function(fr, row_type, scalar_return=False): +def _define_function(fr, row_type, args, scalar_return=False): """ The kernel we want to JIT compile looks something like the following, which is an example for two columns that both have nulls present @@ -199,6 +201,7 @@ def _kernel(retval, input_col_0, input_col_1, offset_0, offset_1, size): # Create argument list for kernel input_columns = ", ".join([f"input_col_{i}" for i in range(len(fr._data))]) input_offsets = ", ".join([f"offset_{i}" for i in range(len(fr._data))]) + extra_args = ", ".join([f"extra_arg_{i}" for i in range(len(args))]) # Generate the initializers for each device function argument initializers = [] @@ -226,6 +229,7 @@ def _kernel(retval, input_col_0, input_col_1, offset_0, offset_1, size): d = { "input_columns": input_columns, "input_offsets": input_offsets, + "extra_args": extra_args, "masked_input_initializers": masked_input_initializers, "row_initializers": row_initializers, "numba_rectype": row_type, # from global @@ -235,7 +239,7 @@ def _kernel(retval, input_col_0, input_col_1, offset_0, offset_1, size): @annotate("UDF COMPILATION", color="darkgreen", domain="cudf_python") -def compile_or_get(df, f): +def compile_or_get(df, f, args): """ Return a compiled kernel in terms of MaskedTypes that launches a kernel equivalent of `f` for the dtypes of `df`. The kernel uses @@ -270,7 +274,7 @@ def compile_or_get(df, f): # precompile the user udf to get the right return type. # could be a MaskedType or a scalar type. - numba_return_type = get_udf_return_type(f, df) + numba_return_type = get_udf_return_type(f, df, args) _is_scalar_return = not isinstance(numba_return_type, MaskedType) scalar_return_type = ( @@ -280,8 +284,7 @@ def compile_or_get(df, f): ) # this is the signature for the final full kernel compilation - sig = construct_signature(df, scalar_return_type) - + sig = construct_signature(df, scalar_return_type, args) # this row type is used within the kernel to pack up the column and # mask data into the dict like data structure the user udf expects row_type = get_frame_row_type(df) @@ -298,7 +301,7 @@ def compile_or_get(df, f): "row_type": row_type, } exec( - _define_function(df, row_type, scalar_return=_is_scalar_return), + _define_function(df, row_type, args, scalar_return=_is_scalar_return), global_exec_context, local_exec_context, ) diff --git a/python/cudf/cudf/tests/test_udf_masked_ops.py b/python/cudf/cudf/tests/test_udf_masked_ops.py index c75915629c2..eccd174fd56 100644 --- a/python/cudf/cudf/tests/test_udf_masked_ops.py +++ b/python/cudf/cudf/tests/test_udf_masked_ops.py @@ -10,21 +10,21 @@ from cudf.testing._utils import NUMERIC_TYPES, assert_eq -def run_masked_udf_test(func_pdf, func_gdf, data, **kwargs): +def run_masked_udf_test(func_pdf, func_gdf, data, args=(), **kwargs): gdf = data pdf = data.to_pandas(nullable=True) - expect = pdf.apply(func_pdf, axis=1) - obtain = gdf.apply(func_gdf, axis=1) + expect = pdf.apply(func_pdf, args=args, axis=1) + obtain = gdf.apply(func_gdf, args=args, axis=1) assert_eq(expect, obtain, **kwargs) -def run_masked_udf_series(func_psr, func_gsr, data, **kwargs): +def run_masked_udf_series(func_psr, func_gsr, data, args=(), **kwargs): gsr = data psr = data.to_pandas(nullable=True) - expect = psr.apply(func_psr) - obtain = gsr.apply(func_gsr) + expect = psr.apply(func_psr, args=args) + obtain = gsr.apply(func_gsr, args=args) assert_eq(expect, obtain, **kwargs) @@ -586,3 +586,44 @@ def func(row): data = cudf.DataFrame(data) run_masked_udf_test(func, func, data) + + +# tests for `DataFrame.apply(f, args=(x,y,z))` +# testing the whole space of possibilities is intractable +# these test the most rudimentary guaranteed functionality +@pytest.mark.parametrize( + "data", + [ + {"a": [1, cudf.NA, 3]}, + {"a": [0.5, 2.0, cudf.NA, cudf.NA, 5.0]}, + {"a": [True, False, cudf.NA]}, + ], +) +@pytest.mark.parametrize("op", arith_ops + comparison_ops) +def test_masked_udf_scalar_args_binops(data, op): + data = cudf.DataFrame(data) + + def func(row, c): + return op(row["a"], c) + + run_masked_udf_test(func, func, data, args=(1,), check_dtype=False) + + +@pytest.mark.parametrize( + "data", + [ + {"a": [1, cudf.NA, 3]}, + {"a": [0.5, 2.0, cudf.NA, cudf.NA, 5.0]}, + {"a": [True, False, cudf.NA]}, + ], +) +@pytest.mark.parametrize("op", arith_ops + comparison_ops) +def test_masked_udf_scalar_args_binops_multiple(data, op): + data = cudf.DataFrame(data) + + def func(row, c, k): + x = op(row["a"], c) + y = op(x, k) + return y + + run_masked_udf_test(func, func, data, args=(1, 2), check_dtype=False)