From a8c0f4be674bb782ccd8739481479fcfa2982502 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Wed, 16 Nov 2022 11:31:57 +0000 Subject: [PATCH 1/4] Fix type promotion edge cases in numerical binops (#12074) The type normalisation applied before heading into libcudf previously had slightly unexpected consequences for large int64 values. If not providing a `cudf.Scalar`, a bare `int64` scalar would be cast to `uint64` and then normal numpy type promotion would unify to `float64`. This is lossy, since int64 to float64 is neither surjective nor injective. To avoid this, try very hard to maintain the dtype of the object coming in, and match pandas behaviour by applying numpy type promotion rules via `numpy.result_type`. - Closes #5938 - Closes #7389 - Closes #12072 - Closes #12092 Authors: - Lawrence Mitchell (https://github.com/wence-) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/12074 --- python/cudf/cudf/core/column/column.py | 3 +- python/cudf/cudf/core/column/numerical.py | 53 +++++---- python/cudf/cudf/core/column/timedelta.py | 10 +- python/cudf/cudf/core/index.py | 5 +- python/cudf/cudf/core/series.py | 5 +- python/cudf/cudf/tests/test_binops.py | 61 ++++++++++ python/cudf/cudf/tests/test_series.py | 6 + python/cudf/cudf/tests/test_timedelta.py | 135 ++++++++++++++++++++-- 8 files changed, 236 insertions(+), 42 deletions(-) diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 6c17b492f8a..59851a1c11b 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -502,7 +502,8 @@ def _wrap_binop_normalization(self, other): if other is NA or other is None: return cudf.Scalar(other, dtype=self.dtype) if isinstance(other, np.ndarray) and other.ndim == 0: - other = other.item() + # Try and maintain the dtype + other = other.dtype.type(other.item()) return self.normalize_binop_value(other) def _scatter_by_slice( diff --git a/python/cudf/cudf/core/column/numerical.py b/python/cudf/cudf/core/column/numerical.py index f126f47c3c2..7943135afe1 100644 --- a/python/cudf/cudf/core/column/numerical.py +++ b/python/cudf/cudf/core/column/numerical.py @@ -35,7 +35,7 @@ is_number, is_scalar, ) -from cudf.core.buffer import Buffer, as_buffer, cuda_array_interface_wrapper +from cudf.core.buffer import Buffer, cuda_array_interface_wrapper from cudf.core.column import ( ColumnBase, as_column, @@ -225,10 +225,18 @@ def _binaryop(self, other: ColumnBinaryOperand, op: str) -> ColumnBase: (tmp.dtype.type in int_float_dtype_mapping) and (tmp.dtype.type != np.bool_) and ( - (np.isscalar(tmp) and (0 == tmp)) - or ( - (isinstance(tmp, NumericalColumn)) and (0.0 in tmp) + ( + ( + np.isscalar(tmp) + or ( + isinstance(tmp, cudf.Scalar) + # host to device copy + and tmp.is_valid() + ) + ) + and (0 == tmp) ) + or ((isinstance(tmp, NumericalColumn)) and (0 in tmp)) ) ): out_dtype = cudf.dtype("float64") @@ -274,7 +282,7 @@ def nans_to_nulls(self: NumericalColumn) -> NumericalColumn: def normalize_binop_value( self, other: ScalarLike - ) -> Union[ColumnBase, ScalarLike]: + ) -> Union[ColumnBase, cudf.Scalar]: if isinstance(other, ColumnBase): if not isinstance(other, NumericalColumn): return NotImplemented @@ -285,25 +293,24 @@ def normalize_binop_value( # expensive device-host transfer just to # adjust the dtype other = other.value - other_dtype = np.min_scalar_type(other) - if other_dtype.kind in {"b", "i", "u", "f"}: - if isinstance(other, cudf.Scalar): - return other - other_dtype = np.promote_types(self.dtype, other_dtype) - if other_dtype == np.dtype("float16"): - other_dtype = cudf.dtype("float32") - other = other_dtype.type(other) + # Try and match pandas and hence numpy. Deduce the common + # dtype via the _value_ of other, and the dtype of self. TODO: + # When NEP50 is accepted, this might want changed or + # simplified. + # This is not at all simple: + # np.result_type(np.int64(0), np.uint8) + # => np.uint8 + # np.result_type(np.asarray([0], dtype=np.int64), np.uint8) + # => np.int64 + # np.promote_types(np.int64(0), np.uint8) + # => np.int64 + # np.promote_types(np.asarray([0], dtype=np.int64).dtype, np.uint8) + # => np.int64 + common_dtype = np.result_type(self.dtype, other) + if common_dtype.kind in {"b", "i", "u", "f"}: if self.dtype.kind == "b": - other_dtype = min_signed_type(other) - if np.isscalar(other): - return cudf.dtype(other_dtype).type(other) - else: - ary = full(len(self), other, dtype=other_dtype) - return column.build_column( - data=as_buffer(ary), - dtype=ary.dtype, - mask=self.mask, - ) + common_dtype = min_signed_type(other) + return cudf.Scalar(other, dtype=common_dtype) else: return NotImplemented diff --git a/python/cudf/cudf/core/column/timedelta.py b/python/cudf/cudf/core/column/timedelta.py index 3dc923e7ded..901547d94a9 100644 --- a/python/cudf/cudf/core/column/timedelta.py +++ b/python/cudf/cudf/core/column/timedelta.py @@ -181,17 +181,17 @@ def _binaryop(self, other: ColumnBinaryOperand, op: str) -> ColumnBase: out_dtype = determine_out_dtype(self.dtype, other.dtype) elif op in {"__truediv__", "__floordiv__"}: common_dtype = determine_out_dtype(self.dtype, other.dtype) - this = self.astype(common_dtype).astype("float64") + out_dtype = np.float64 if op == "__truediv__" else np.int64 + this = self.astype(common_dtype).astype(out_dtype) if isinstance(other, cudf.Scalar): if other.is_valid(): other = other.value.astype(common_dtype).astype( - "float64" + out_dtype ) else: - other = cudf.Scalar(None, "float64") + other = cudf.Scalar(None, out_dtype) else: - other = other.astype(common_dtype).astype("float64") - out_dtype = np.float64 if op == "__truediv__" else np.int64 + other = other.astype(common_dtype).astype(out_dtype) elif op in {"__add__", "__sub__"}: out_dtype = determine_out_dtype(self.dtype, other.dtype) elif other.dtype.kind in {"f", "i", "u"}: diff --git a/python/cudf/cudf/core/index.py b/python/cudf/cudf/core/index.py index 61971e3c749..e561dd0a214 100644 --- a/python/cudf/cudf/core/index.py +++ b/python/cudf/cudf/core/index.py @@ -2072,7 +2072,10 @@ def microsecond(self): """ # noqa: E501 return as_index( ( - self._values.get_dt_field("millisecond") + # Need to manually promote column to int32 because + # pandas-matching binop behaviour requires that this + # __mul__ returns an int16 column. + self._values.get_dt_field("millisecond").astype("int32") * cudf.Scalar(1000, dtype="int32") ) + self._values.get_dt_field("microsecond"), diff --git a/python/cudf/cudf/core/series.py b/python/cudf/cudf/core/series.py index 873bebf1292..8f4f6fe57d6 100644 --- a/python/cudf/cudf/core/series.py +++ b/python/cudf/cudf/core/series.py @@ -3660,7 +3660,10 @@ def microsecond(self): """ return Series( data=( - self.series._column.get_dt_field("millisecond") + # Need to manually promote column to int32 because + # pandas-matching binop behaviour requires that this + # __mul__ returns an int16 column. + self.series._column.get_dt_field("millisecond").astype("int32") * cudf.Scalar(1000, dtype="int32") ) + self.series._column.get_dt_field("microsecond"), diff --git a/python/cudf/cudf/tests/test_binops.py b/python/cudf/cudf/tests/test_binops.py index c8b8dfa1e60..8337084be72 100644 --- a/python/cudf/cudf/tests/test_binops.py +++ b/python/cudf/cudf/tests/test_binops.py @@ -877,6 +877,67 @@ def test_binop_bool_uint(func, rhs): ) +@pytest.mark.parametrize( + "series_dtype", (np.bool_, np.int8, np.uint8, np.int64, np.uint64) +) +@pytest.mark.parametrize( + "divisor_dtype", + ( + pytest.param( + np.bool_, + marks=pytest.mark.xfail( + reason=( + "Pandas handling of division by zero-bool is too strange" + ) + ), + ), + np.int8, + np.uint8, + np.int64, + np.uint64, + ), +) +@pytest.mark.parametrize("scalar_divisor", [False, True]) +def test_floordiv_zero_float64(series_dtype, divisor_dtype, scalar_divisor): + sr = pd.Series([1, 2, 3], dtype=series_dtype) + cr = cudf.from_pandas(sr) + + if scalar_divisor: + pd_div = divisor_dtype(0) + cudf_div = cudf.Scalar(0, dtype=divisor_dtype) + else: + pd_div = pd.Series([0], dtype=divisor_dtype) + cudf_div = cudf.from_pandas(pd_div) + utils.assert_eq((sr // pd_div), (cr // cudf_div)) + + +@pytest.mark.parametrize( + "dtype", + ( + pytest.param( + np.bool_, + marks=pytest.mark.xfail( + reason=( + "Pandas handling of division by zero-bool is too strange" + ) + ), + ), + np.int8, + np.uint8, + np.int64, + np.uint64, + np.float32, + np.float64, + ), +) +def test_rmod_zero_nan(dtype): + sr = pd.Series([1, 1, 0], dtype=dtype) + cr = cudf.from_pandas(sr) + utils.assert_eq(1 % sr, 1 % cr) + expected_dtype = np.float64 if cr.dtype.kind != "f" else dtype + utils.assert_eq(1 % cr, cudf.Series([0, 0, None], dtype=expected_dtype)) + + def test_series_misc_binop(): pds = pd.Series([1, 2, 4], name="abc xyz") gds = cudf.Series([1, 2, 4], name="abc xyz") diff --git a/python/cudf/cudf/tests/test_series.py b/python/cudf/cudf/tests/test_series.py index c902bcb8b47..2525f055738 100644 --- a/python/cudf/cudf/tests/test_series.py +++ b/python/cudf/cudf/tests/test_series.py @@ -1995,6 +1995,12 @@ def test_set_bool_error(dtype, bool_scalar): ) +def test_int64_equality(): + s = cudf.Series(np.asarray([2**63 - 10, 2**63 - 100], dtype=np.int64)) + assert (s != np.int64(2**63 - 1)).all() + assert (s != cudf.Scalar(2**63 - 1, dtype=np.int64)).all() + + @pytest.mark.parametrize("into", [dict, OrderedDict, defaultdict(list)]) def test_series_to_dict(into): gs = cudf.Series(["ab", "de", "zx"], index=[10, 20, 100]) diff --git a/python/cudf/cudf/tests/test_timedelta.py b/python/cudf/cudf/tests/test_timedelta.py index 23270875a92..c1b603e34f2 100644 --- a/python/cudf/cudf/tests/test_timedelta.py +++ b/python/cudf/cudf/tests/test_timedelta.py @@ -400,12 +400,7 @@ def test_timedelta_dataframe_ops(df, op): [1], [12, 11, 232, 223432411, 2343241, 234324, 23234], [12, 11, 2.32, 2234.32411, 2343.241, 23432.4, 23234], - pytest.param( - [1.321, 1132.324, 23223231.11, 233.41, 0.2434, 332, 323], - marks=pytest.mark.xfail( - reason="https://github.com/rapidsai/cudf/issues/5938" - ), - ), + [1.321, 1132.324, 23223231.11, 233.41, 332, 323], [12, 11, 2.32, 2234.32411, 2343.241, 23432.4, 23234], ], ) @@ -492,6 +487,36 @@ def test_timedelta_series_ops_with_scalars(data, other_scalars, dtype, op): assert_eq(expected, actual) +@pytest.mark.parametrize( + "reverse", + [ + False, + pytest.param( + True, + marks=pytest.mark.xfail( + strict=True, + reason=( + "timedelta modulo by zero is dubiously defined in " + "both pandas and cuDF " + "(see https://github.com/rapidsai/cudf/issues/5938)" + ), + ), + ), + ], +) +def test_timedelta_series_mod_with_scalar_zero(reverse): + gsr = cudf.Series(data=[0.2434], dtype=np.timedelta64(1, "ns")) + psr = gsr.to_pandas() + scalar = datetime.timedelta(days=768) + if reverse: + expected = scalar % psr + actual = scalar % gsr + else: + expected = psr % scalar + actual = gsr % scalar + assert_eq(expected, actual) + + @pytest.mark.parametrize( "data", [ @@ -597,6 +622,37 @@ def test_timedelta_series_ops_with_cudf_scalars(data, cpu_scalar, dtype, op): assert_eq(expected, actual) +@pytest.mark.parametrize( + "reverse", + [ + False, + pytest.param( + True, + marks=pytest.mark.xfail( + strict=True, + reason=( + "timedelta modulo by zero is dubiously defined in " + "both pandas and cuDF " + "(see https://github.com/rapidsai/cudf/issues/5938)" + ), + ), + ), + ], +) +def test_timedelta_series_mod_with_cudf_scalar_zero(reverse): + gsr = cudf.Series(data=[0.2434], dtype=np.timedelta64(1, "ns")) + psr = gsr.to_pandas() + scalar = datetime.timedelta(days=768) + gpu_scalar = cudf.Scalar(scalar) + if reverse: + expected = scalar % psr + actual = gpu_scalar % gsr + else: + expected = psr % scalar + actual = gsr % gpu_scalar + assert_eq(expected, actual) + + @pytest.mark.parametrize( "data", [ @@ -812,7 +868,8 @@ def test_timedelta_datetime_index_ops_misc( pytest.param( "floordiv", marks=pytest.mark.xfail( - reason="https://github.com/pandas-dev/pandas/issues/35529" + condition=not PANDAS_GE_120, + reason="https://github.com/pandas-dev/pandas/issues/35529", ), ), ], @@ -850,7 +907,35 @@ def test_timedelta_index_ops_with_scalars(data, other_scalars, dtype, op): expected = other_scalars // ptdi actual = other_scalars // gtdi - assert_eq(expected, actual) + if op == "floordiv": + # Hand-coding pytest.xfail behaviour for certain combinations + if ( + 0 in ptdi.astype("int") + and np.timedelta64(other_scalars).item() is not None + ): + with pytest.raises(AssertionError): + # Related to https://github.com/rapidsai/cudf/issues/5938 + # + # Division by zero for datetime or timedelta is + # dubiously defined in both pandas (Any // 0 -> 0 in + # pandas) and cuDF (undefined behaviour) + assert_eq(expected, actual) + elif ( + (None not in ptdi) + and np.nan not in expected + and ( + expected.astype("float64").astype("int64") + != expected.astype("int64") + ).any() + ): + with pytest.raises(AssertionError): + # Incorrect implementation of floordiv in cuDF: + # https://github.com/rapidsai/cudf/issues/12120 + assert_eq(expected, actual) + else: + assert_eq(expected, actual) + else: + assert_eq(expected, actual) @pytest.mark.parametrize("data", _TIMEDELTA_DATA_NON_OVERFLOW) @@ -876,12 +961,12 @@ def test_timedelta_index_ops_with_scalars(data, other_scalars, dtype, op): pytest.param( "floordiv", marks=pytest.mark.xfail( - reason="https://github.com/rapidsai/cudf/issues/5938" + condition=not PANDAS_GE_120, + reason="https://github.com/pandas-dev/pandas/issues/35529", ), ), ], ) -@pytest.mark.filterwarnings("ignore:divide by zero:RuntimeWarning:pandas") def test_timedelta_index_ops_with_cudf_scalars(data, cpu_scalar, dtype, op): gtdi = cudf.Index(data=data, dtype=dtype) ptdi = gtdi.to_pandas() @@ -916,7 +1001,35 @@ def test_timedelta_index_ops_with_cudf_scalars(data, cpu_scalar, dtype, op): expected = cpu_scalar // ptdi actual = gpu_scalar // gtdi - assert_eq(expected, actual) + if op == "floordiv": + # Hand-coding pytest.xfail behaviour for certain combinations + if ( + 0 in ptdi.astype("int") + and np.timedelta64(cpu_scalar).item() is not None + ): + with pytest.raises(AssertionError): + # Related to https://github.com/rapidsai/cudf/issues/5938 + # + # Division by zero for datetime or timedelta is + # dubiously defined in both pandas (Any // 0 -> 0 in + # pandas) and cuDF (undefined behaviour) + assert_eq(expected, actual) + elif ( + (None not in ptdi) + and np.nan not in expected + and ( + expected.astype("float64").astype("int64") + != expected.astype("int64") + ).any() + ): + with pytest.raises(AssertionError): + # Incorrect implementation of floordiv in cuDF: + # https://github.com/rapidsai/cudf/issues/12120 + assert_eq(expected, actual) + else: + assert_eq(expected, actual) + else: + assert_eq(expected, actual) @pytest.mark.parametrize("data", _TIMEDELTA_DATA) From 742093e3af975c297ce2ed2d9e6c14df2b464a3b Mon Sep 17 00:00:00 2001 From: brandon-b-miller <53796099+brandon-b-miller@users.noreply.github.com> Date: Wed, 16 Nov 2022 10:59:05 -0600 Subject: [PATCH 2/4] Support `+` in `strings_udf` (#12117) This PR adds support for the following operator `strings_udf`: - `st + other` Part of https://github.com/rapidsai/cudf/issues/9639 Authors: - https://github.com/brandon-b-miller - David Wendt (https://github.com/davidwendt) Approvers: - David Wendt (https://github.com/davidwendt) - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/12117 --- python/cudf/cudf/core/udf/strings_typing.py | 10 +++++++ python/cudf/cudf/tests/test_udf_masked_ops.py | 9 ++++++ .../strings_udf/cpp/src/strings/udf/shim.cu | 13 +++++++++ python/strings_udf/strings_udf/_typing.py | 5 ++++ python/strings_udf/strings_udf/lowering.py | 28 +++++++++++++++++++ .../strings_udf/tests/test_string_udfs.py | 16 +++++++++++ 6 files changed, 81 insertions(+) diff --git a/python/cudf/cudf/core/udf/strings_typing.py b/python/cudf/cudf/core/udf/strings_typing.py index f8f50600b12..e8a35c12f71 100644 --- a/python/cudf/cudf/core/udf/strings_typing.py +++ b/python/cudf/cudf/core/udf/strings_typing.py @@ -59,6 +59,16 @@ def len_typing(self, args, kws): return nb_signature(size_type, args[0]) +@register_string_function(operator.add) +def concat_typing(self, args, kws): + if _is_valid_string_arg(args[0]) and _is_valid_string_arg(args[1]): + return nb_signature( + MaskedType(udf_string), + MaskedType(string_view), + MaskedType(string_view), + ) + + @register_string_function(operator.contains) def contains_typing(self, args, kws): if _is_valid_string_arg(args[0]) and _is_valid_string_arg(args[1]): diff --git a/python/cudf/cudf/tests/test_udf_masked_ops.py b/python/cudf/cudf/tests/test_udf_masked_ops.py index 7af47f981d6..fbe6b3f8888 100644 --- a/python/cudf/cudf/tests/test_udf_masked_ops.py +++ b/python/cudf/cudf/tests/test_udf_masked_ops.py @@ -903,6 +903,15 @@ def func(row): run_masked_udf_test(func, str_udf_data, check_dtype=False) +@string_udf_test +@pytest.mark.parametrize("concat_char", ["1", "a", "12", " ", "", ".", "@"]) +def test_string_udf_concat(str_udf_data, concat_char): + def func(row): + return row["str_col"] + concat_char + + run_masked_udf_test(func, str_udf_data, check_dtype=False) + + @pytest.mark.parametrize( "data", [[1.0, 0.0, 1.5], [1, 0, 2], [True, False, True]] ) diff --git a/python/strings_udf/cpp/src/strings/udf/shim.cu b/python/strings_udf/cpp/src/strings/udf/shim.cu index 21998d59bbb..8fc158d7eb7 100644 --- a/python/strings_udf/cpp/src/strings/udf/shim.cu +++ b/python/strings_udf/cpp/src/strings/udf/shim.cu @@ -270,3 +270,16 @@ extern "C" __device__ int rstrip(int* nb_retval, return 0; } + +extern "C" __device__ int concat(int* nb_retval, void* udf_str, void* const* lhs, void* const* rhs) +{ + auto lhs_ptr = reinterpret_cast(lhs); + auto rhs_ptr = reinterpret_cast(rhs); + + auto udf_str_ptr = new (udf_str) udf_string; + + udf_string result; + result.append(*lhs_ptr).append(*rhs_ptr); + *udf_str_ptr = result; + return 0; +} diff --git a/python/strings_udf/strings_udf/_typing.py b/python/strings_udf/strings_udf/_typing.py index a309a9cb93c..b678db88b95 100644 --- a/python/strings_udf/strings_udf/_typing.py +++ b/python/strings_udf/strings_udf/_typing.py @@ -159,8 +159,13 @@ def generic(self, args, kws): register_stringview_binaryop(operator.gt, types.boolean) register_stringview_binaryop(operator.le, types.boolean) register_stringview_binaryop(operator.ge, types.boolean) + +# st in other register_stringview_binaryop(operator.contains, types.boolean) +# st + other +register_stringview_binaryop(operator.add, udf_string) + def create_binary_attr(attrname, retty): """ diff --git a/python/strings_udf/strings_udf/lowering.py b/python/strings_udf/strings_udf/lowering.py index a6d43ece1c5..9e34b61e6da 100644 --- a/python/strings_udf/strings_udf/lowering.py +++ b/python/strings_udf/strings_udf/lowering.py @@ -25,6 +25,9 @@ # CUDA function declarations # read-only (input is a string_view, output is a fixed with type) _string_view_len = cuda.declare_device("len", size_type(_STR_VIEW_PTR)) +_concat_string_view = cuda.declare_device( + "concat", types.void(_UDF_STRING_PTR, _STR_VIEW_PTR, _STR_VIEW_PTR) +) def _declare_binary_func(lhs, rhs, out, name): @@ -160,6 +163,31 @@ def len_impl(context, builder, sig, args): return result +def call_concat_string_view(result, lhs, rhs): + return _concat_string_view(result, lhs, rhs) + + +@cuda_lower(operator.add, string_view, string_view) +def concat_impl(context, builder, sig, args): + lhs_ptr = builder.alloca(args[0].type) + rhs_ptr = builder.alloca(args[1].type) + builder.store(args[0], lhs_ptr) + builder.store(args[1], rhs_ptr) + + udf_str_ptr = builder.alloca(default_manager[udf_string].get_value_type()) + _ = context.compile_internal( + builder, + call_concat_string_view, + types.void(_UDF_STRING_PTR, _STR_VIEW_PTR, _STR_VIEW_PTR), + (udf_str_ptr, lhs_ptr, rhs_ptr), + ) + + result = cgutils.create_struct_proxy(udf_string)( + context, builder, value=builder.load(udf_str_ptr) + ) + return result._getvalue() + + def create_binary_string_func(binary_func, retty): """ Provide a wrapper around numba's low-level extension API which diff --git a/python/strings_udf/strings_udf/tests/test_string_udfs.py b/python/strings_udf/strings_udf/tests/test_string_udfs.py index 522433d404f..49663ee02ec 100644 --- a/python/strings_udf/strings_udf/tests/test_string_udfs.py +++ b/python/strings_udf/strings_udf/tests/test_string_udfs.py @@ -302,3 +302,19 @@ def func(st): return st.rstrip(strip_char) run_udf_test(data, func, "str") + + +@pytest.mark.parametrize("concat_char", ["1", "a", "12", " ", "", ".", "@"]) +def test_string_udf_concat(data, concat_char): + def func(st): + return st + concat_char + + run_udf_test(data, func, "str") + + +@pytest.mark.parametrize("concat_char", ["1", "a", "12", " ", "", ".", "@"]) +def test_string_udf_concat_reflected(data, concat_char): + def func(st): + return concat_char + st + + run_udf_test(data, func, "str") From 6ad57524ed6ab5037755491839dab53f00b8158a Mon Sep 17 00:00:00 2001 From: Bradley Dice Date: Wed, 16 Nov 2022 11:00:21 -0600 Subject: [PATCH 3/4] Use rapidsai CODE_OF_CONDUCT.md (#12166) This repo's `CODE_OF_CONDUCT.md` is superseded by an organization-wide policy: https://github.com/rapidsai/.github/pull/3 Authors: - Bradley Dice (https://github.com/bdice) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/12166 --- CODE_OF_CONDUCT.md | 1 - 1 file changed, 1 deletion(-) delete mode 100644 CODE_OF_CONDUCT.md diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md deleted file mode 100644 index 563581d270d..00000000000 --- a/CODE_OF_CONDUCT.md +++ /dev/null @@ -1 +0,0 @@ -This project has adopted the [Contributor Covenant Code of Conduct](https://docs.rapids.ai/resources/conduct/). From defad5eac490a90be5525c578ce954b2095f2812 Mon Sep 17 00:00:00 2001 From: Karthikeyan <6488848+karthikeyann@users.noreply.github.com> Date: Wed, 16 Nov 2022 23:43:44 +0530 Subject: [PATCH 4/4] byte_range support for JSON Lines format (#12017) This PR adds support for byte_range to be used in nested JSON parser for JSON Lines format (newline delimited JSON http://ndjson.org/) The record delimiter "New lines" are only expected at the end of each record. Newlines in middle of record or within quotes are not expected and will lead to unknown behaviour. The record delimiters are not context aware in this PR. This PR provides libcudf APIs, Cython APIs and python tests to enable byte range support. This will allow dask to do distributed/segmented parsing of JSON. No Dask changes Addresses part of https://github.com/rapidsai/cudf/issues/11843 Depends on #12060 Authors: - Karthikeyan (https://github.com/karthikeyann) Approvers: - Elias Stehle (https://github.com/elstehle) - Lawrence Mitchell (https://github.com/wence-) - Robert Maynard (https://github.com/robertmaynard) URL: https://github.com/rapidsai/cudf/pull/12017 --- cpp/CMakeLists.txt | 1 + .../io/json/experimental/byte_range_info.cu | 36 +++++ cpp/src/io/json/experimental/read_json.cpp | 98 +++++++++++++- cpp/src/io/json/experimental/read_json.hpp | 11 +- cpp/tests/CMakeLists.txt | 2 +- cpp/tests/io/json_chunked_reader.cpp | 128 ++++++++++++++++++ python/cudf/cudf/tests/test_json.py | 61 +++++++-- 7 files changed, 318 insertions(+), 19 deletions(-) create mode 100644 cpp/src/io/json/experimental/byte_range_info.cu create mode 100644 cpp/tests/io/json_chunked_reader.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 7e8ee5b60bf..c52248c1eab 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -329,6 +329,7 @@ add_library( src/io/json/json_tree.cu src/io/json/nested_json_gpu.cu src/io/json/reader_impl.cu + src/io/json/experimental/byte_range_info.cu src/io/json/experimental/read_json.cpp src/io/orc/aggregate_orc_metadata.cpp src/io/orc/dict_enc.cu diff --git a/cpp/src/io/json/experimental/byte_range_info.cu b/cpp/src/io/json/experimental/byte_range_info.cu new file mode 100644 index 00000000000..d6e30d090a5 --- /dev/null +++ b/cpp/src/io/json/experimental/byte_range_info.cu @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include + +namespace cudf::io::detail::json::experimental { + +// Extract the first character position in the string. +size_type find_first_delimiter(device_span d_data, + char const delimiter, + rmm::cuda_stream_view stream) +{ + auto const first_delimiter_position = + thrust::find(rmm::exec_policy(stream), d_data.begin(), d_data.end(), delimiter); + return first_delimiter_position != d_data.end() ? first_delimiter_position - d_data.begin() : -1; +} + +} // namespace cudf::io::detail::json::experimental diff --git a/cpp/src/io/json/experimental/read_json.cpp b/cpp/src/io/json/experimental/read_json.cpp index b0b7d5baa0f..87d196131ca 100644 --- a/cpp/src/io/json/experimental/read_json.cpp +++ b/cpp/src/io/json/experimental/read_json.cpp @@ -64,19 +64,105 @@ std::vector ingest_raw_input(host_span> con } } +size_type find_first_delimiter_in_chunk(host_span> sources, + json_reader_options const& reader_opts, + char const delimiter, + rmm::cuda_stream_view stream) +{ + auto const buffer = ingest_raw_input(sources, + reader_opts.get_compression(), + reader_opts.get_byte_range_offset(), + reader_opts.get_byte_range_size()); + auto d_data = rmm::device_uvector(buffer.size(), stream); + CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.data(), + buffer.data(), + buffer.size() * sizeof(decltype(buffer)::value_type), + cudaMemcpyHostToDevice, + stream.value())); + return find_first_delimiter(d_data, delimiter, stream); +} + +size_type find_first_delimiter_in_chunk(host_span buffer, + char const delimiter, + rmm::cuda_stream_view stream) +{ + auto d_data = rmm::device_uvector(buffer.size(), stream); + CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.data(), + buffer.data(), + buffer.size() * sizeof(decltype(buffer)::value_type), + cudaMemcpyHostToDevice, + stream.value())); + return find_first_delimiter(d_data, delimiter, stream); +} + +bool should_load_whole_source(json_reader_options const& reader_opts) +{ + return reader_opts.get_byte_range_offset() == 0 and // + reader_opts.get_byte_range_size() == 0; +} + +/** + * @brief Get the byte range between record starts and ends starting from the given range. + * + * if get_byte_range_offset == 0, then we can skip the first delimiter search + * if get_byte_range_offset != 0, then we need to search for the first delimiter in given range. + * if not found, skip this chunk, if found, then search for first delimiter in next range until we + * find a delimiter. Use this as actual range for parsing. + * + * @param sources Data sources to read from + * @param reader_opts JSON reader options with range offset and range size + * @param stream CUDA stream used for device memory operations and kernel launches + * @return Byte range for parsing + */ +auto get_record_range_raw_input(host_span> sources, + json_reader_options const& reader_opts, + rmm::cuda_stream_view stream) +{ + auto buffer = ingest_raw_input(sources, + reader_opts.get_compression(), + reader_opts.get_byte_range_offset(), + reader_opts.get_byte_range_size()); + if (should_load_whole_source(reader_opts)) return buffer; + auto first_delim_pos = reader_opts.get_byte_range_offset() == 0 + ? 0 + : find_first_delimiter_in_chunk(buffer, '\n', stream); + if (first_delim_pos == -1) { + return std::vector{}; + } else { + first_delim_pos = first_delim_pos + reader_opts.get_byte_range_offset(); + // Find next delimiter + decltype(first_delim_pos) next_delim_pos = -1; + auto const total_source_size = sources_size(sources, 0, 0); + auto current_offset = reader_opts.get_byte_range_offset() + reader_opts.get_byte_range_size(); + while (current_offset < total_source_size and next_delim_pos == -1) { + buffer = ingest_raw_input( + sources, reader_opts.get_compression(), current_offset, reader_opts.get_byte_range_size()); + next_delim_pos = find_first_delimiter_in_chunk(buffer, '\n', stream); + if (next_delim_pos == -1) { current_offset += reader_opts.get_byte_range_size(); } + } + if (next_delim_pos == -1) { + next_delim_pos = total_source_size; + } else { + next_delim_pos = next_delim_pos + current_offset; + } + return ingest_raw_input( + sources, reader_opts.get_compression(), first_delim_pos, next_delim_pos - first_delim_pos); + } +} + table_with_metadata read_json(host_span> sources, json_reader_options const& reader_opts, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { CUDF_FUNC_RANGE(); - CUDF_EXPECTS(reader_opts.get_byte_range_offset() == 0 and reader_opts.get_byte_range_size() == 0, - "specifying a byte range is not yet supported"); + if (not should_load_whole_source(reader_opts)) { + CUDF_EXPECTS(reader_opts.is_enabled_lines(), + "specifying a byte range is supported only for json lines"); + } + + auto const buffer = get_record_range_raw_input(sources, reader_opts, stream); - auto const buffer = ingest_raw_input(sources, - reader_opts.get_compression(), - reader_opts.get_byte_range_offset(), - reader_opts.get_byte_range_size()); auto data = host_span(reinterpret_cast(buffer.data()), buffer.size()); try { diff --git a/cpp/src/io/json/experimental/read_json.hpp b/cpp/src/io/json/experimental/read_json.hpp index c9f74b2cc41..48e104c4254 100644 --- a/cpp/src/io/json/experimental/read_json.hpp +++ b/cpp/src/io/json/experimental/read_json.hpp @@ -33,4 +33,13 @@ table_with_metadata read_json(host_span> sources, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); -} +size_type find_first_delimiter(device_span d_data, + char const delimiter, + rmm::cuda_stream_view stream); + +size_type find_first_delimiter_in_chunk(host_span> sources, + json_reader_options const& reader_opts, + char const delimiter, + rmm::cuda_stream_view stream); + +} // namespace cudf::io::detail::json::experimental diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 5ff2e9bf6d6..c602ccc7374 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -224,7 +224,7 @@ ConfigureTest(CSV_TEST io/csv_test.cpp) ConfigureTest(FILE_IO_TEST io/file_io_test.cpp) ConfigureTest(ORC_TEST io/orc_test.cpp) ConfigureTest(PARQUET_TEST io/parquet_test.cpp) -ConfigureTest(JSON_TEST io/json_test.cpp) +ConfigureTest(JSON_TEST io/json_test.cpp io/json_chunked_reader.cpp) ConfigureTest(JSON_TYPE_CAST_TEST io/json_type_cast_test.cu) ConfigureTest(NESTED_JSON_TEST io/nested_json_test.cpp io/json_tree.cpp) ConfigureTest(ARROW_IO_SOURCE_TEST io/arrow_io_source_test.cpp) diff --git a/cpp/tests/io/json_chunked_reader.cpp b/cpp/tests/io/json_chunked_reader.cpp new file mode 100644 index 00000000000..28b41c5691f --- /dev/null +++ b/cpp/tests/io/json_chunked_reader.cpp @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include + +/** + * @brief Base test fixture for JSON reader tests + */ +struct JsonReaderTest : public cudf::test::BaseFixture { +}; + +// function to extract first delimiter in the string in each chunk, +// collate together and form byte_range for each chunk, +// parse separately. +std::vector skeleton_for_parellel_chunk_reader( + cudf::host_span> sources, + cudf::io::json_reader_options const& reader_opts, + int32_t chunk_size, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + using namespace cudf::io::detail::json::experimental; + using cudf::size_type; + // assuming single source. + size_t total_source_size = 0; + for (auto const& source : sources) { + total_source_size += source->size(); + } + size_t num_chunks = (total_source_size + chunk_size - 1) / chunk_size; + constexpr size_type no_min_value = -1; + + // Get the first delimiter in each chunk. + std::vector first_delimiter_index(num_chunks); + auto reader_opts_chunk = reader_opts; + for (size_t i = 0; i < num_chunks; i++) { + auto const chunk_start = i * chunk_size; + reader_opts_chunk.set_byte_range_offset(chunk_start); + reader_opts_chunk.set_byte_range_size(chunk_size); + first_delimiter_index[i] = + find_first_delimiter_in_chunk(sources, reader_opts_chunk, '\n', stream); + if (first_delimiter_index[i] != no_min_value) { first_delimiter_index[i] += chunk_start; } + } + + // Process and allocate record start, end for each worker. + using record_range = std::pair; + std::vector record_ranges; + record_ranges.reserve(num_chunks); + first_delimiter_index[0] = 0; + auto prev = first_delimiter_index[0]; + for (size_t i = 1; i < num_chunks; i++) { + if (first_delimiter_index[i] == no_min_value) continue; + record_ranges.push_back({prev, first_delimiter_index[i]}); + prev = first_delimiter_index[i]; + } + record_ranges.push_back({prev, total_source_size}); + + std::vector tables; + // Process each chunk in parallel. + for (auto const [chunk_start, chunk_end] : record_ranges) { + if (chunk_start == -1 or chunk_end == -1) continue; + reader_opts_chunk.set_byte_range_offset(chunk_start); + reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start); + tables.push_back(read_json(sources, reader_opts_chunk, stream, mr)); + } + // assume all records have same number of columns, and inferred same type. (or schema is passed) + // TODO a step before to merge all columns, types and infer final schema. + return tables; +} + +TEST_F(JsonReaderTest, ByteRange) +{ + std::string const json_string = R"( + { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } + { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } + { "a": { "y" : 6}, "b" : [6 ], "c": 13 } + { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; + + // Initialize parsing options (reading json lines) + cudf::io::json_reader_options json_lines_options = + cudf::io::json_reader_options::builder( + cudf::io::source_info{json_string.c_str(), json_string.size()}) + .compression(cudf::io::compression_type::NONE) + .lines(true) + .experimental(true); + + // Read full test data via existing, nested JSON lines reader + cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options); + + auto datasources = cudf::io::datasource::create(json_lines_options.get_source().buffers()); + + // Test for different chunk sizes + for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500}) { + auto const tables = skeleton_for_parellel_chunk_reader(datasources, + json_lines_options, + chunk_size, + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); + + auto table_views = std::vector(tables.size()); + std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) { + return table.tbl->view(); + }); + auto result = cudf::concatenate(table_views); + + // Verify that the data read via chunked reader matches the data read via nested JSON reader + // cannot use EQUAL due to concatenate removing null mask + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(current_reader_table.tbl->view(), result->view()); + } +} diff --git a/python/cudf/cudf/tests/test_json.py b/python/cudf/cudf/tests/test_json.py index 14238be7bc1..2eda71c5c45 100644 --- a/python/cudf/cudf/tests/test_json.py +++ b/python/cudf/cudf/tests/test_json.py @@ -929,21 +929,60 @@ def test_json_dtypes_nested_data(): ( "missing", """ - { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } - { "a": { "y" : 6}, "b" : [4, 5 ]} - { "a": { "y" : 6}, "c": 13 } - { "a": { "y" : 6}, "b" : [7 ], "c": 14 } - """, + { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } + { "a": { "y" : 6}, "b" : [4, 5 ] } + { "a": { "y" : 6}, "c": 13 } + { "a": { "y" : 6}, "b" : [7 ], "c": 14 } +""", + ), + pytest.param( + "dtype_mismatch", + """\ + { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } + { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } + { "a": { "y" : 6}, "b" : [6 ], "c": 13 } + { "a": { "y" : 6}, "b" : [7 ], "c": 14.0 }""", ), ], ) -def test_order_nested_json_reader(tag, data): - expected = cudf.read_json(StringIO(data), engine="pandas", lines=True) - target = cudf.read_json( - StringIO(data), engine="cudf_experimental", lines=True - ) +class TestNestedJsonReaderCommon: + @pytest.mark.parametrize("chunk_size", [10, 100, 1024, 1024 * 1024]) + def test_chunked_nested_json_reader(self, tag, data, chunk_size): + expected = cudf.read_json( + StringIO(data), engine="cudf_experimental", lines=True + ) - assert_eq(expected, target, check_dtype=True) + source_size = len(data) + chunks = [] + for chunk_start in range(0, source_size, chunk_size): + chunks.append( + cudf.read_json( + StringIO(data), + engine="cudf_experimental", + byte_range=[chunk_start, chunk_size], + lines=True, + ) + ) + df = cudf.concat(chunks, ignore_index=True) + if tag == "missing" and chunk_size == 10: + with pytest.raises(AssertionError): + # nested JSON reader inferences integer with nulls as float64 + assert expected.to_arrow().equals(df.to_arrow()) + else: + assert expected.to_arrow().equals(df.to_arrow()) + + def test_order_nested_json_reader(self, tag, data): + expected = pd.read_json(StringIO(data), lines=True) + target = cudf.read_json( + StringIO(data), engine="cudf_experimental", lines=True + ) + if tag == "dtype_mismatch": + with pytest.raises(AssertionError): + # pandas parses integer values in float representation + # as integer + assert pa.Table.from_pandas(expected).equals(target.to_arrow()) + else: + assert pa.Table.from_pandas(expected).equals(target.to_arrow()) def test_json_round_trip_gzip():