From b1de945f9c6e3aa810d4b1ef7d53d0b4b88f7da3 Mon Sep 17 00:00:00 2001 From: Michael Wang Date: Thu, 6 Jan 2022 06:26:42 -0800 Subject: [PATCH 1/9] Remove deprecated method `one_hot_encoding` (#9977) This PR removes deprecated method `one_hot_encoding` and its test cases. There is a test case for generic indexed dataframe in `one_hot_encoding` that was not covered in `get_dummies`, this PR adds that coverage. Authors: - Michael Wang (https://github.com/isVoid) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/9977 --- python/cudf/cudf/core/dataframe.py | 72 ------------------ python/cudf/cudf/core/reshape.py | 12 +-- python/cudf/cudf/core/series.py | 77 ------------------- python/cudf/cudf/tests/test_onehot.py | 105 +++----------------------- 4 files changed, 16 insertions(+), 250 deletions(-) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index d97ea456f72..3366a0af4ba 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -3051,78 +3051,6 @@ def as_matrix(self, columns=None): ) return self.as_gpu_matrix(columns=columns).copy_to_host() - def one_hot_encoding( - self, column, prefix, cats, prefix_sep="_", dtype="float64" - ): - """ - Expand a column with one-hot-encoding. - - Parameters - ---------- - - column : str - the source column with binary encoding for the data. - prefix : str - the new column name prefix. - cats : sequence of ints - the sequence of categories as integers. - prefix_sep : str - the separator between the prefix and the category. - dtype : - the dtype for the outputs; defaults to float64. - - Returns - ------- - - a new dataframe with new columns append for each category. - - Examples - -------- - >>> import pandas as pd - >>> import cudf - >>> pet_owner = [1, 2, 3, 4, 5] - >>> pet_type = ['fish', 'dog', 'fish', 'bird', 'fish'] - >>> df = pd.DataFrame({'pet_owner': pet_owner, 'pet_type': pet_type}) - >>> df.pet_type = df.pet_type.astype('category') - - Create a column with numerically encoded category values - - >>> df['pet_codes'] = df.pet_type.cat.codes - >>> gdf = cudf.from_pandas(df) - - Create the list of category codes to use in the encoding - - >>> codes = gdf.pet_codes.unique() - >>> gdf.one_hot_encoding('pet_codes', 'pet_dummy', codes).head() - pet_owner pet_type pet_codes pet_dummy_0 pet_dummy_1 pet_dummy_2 - 0 1 fish 2 0.0 0.0 1.0 - 1 2 dog 1 0.0 1.0 0.0 - 2 3 fish 2 0.0 0.0 1.0 - 3 4 bird 0 1.0 0.0 0.0 - 4 5 fish 2 0.0 0.0 1.0 - """ - - warnings.warn( - "DataFrame.one_hot_encoding is deprecated and will be removed in " - "future, use `get_dummies` instead.", - FutureWarning, - ) - - if hasattr(cats, "to_arrow"): - cats = cats.to_arrow().to_pylist() - else: - cats = pd.Series(cats, dtype="object") - - newnames = [ - prefix_sep.join([prefix, "null" if cat is None else str(cat)]) - for cat in cats - ] - newcols = self[column].one_hot_encoding(cats=cats, dtype=dtype) - outdf = self.copy() - for name, col in zip(newnames, newcols): - outdf.insert(len(outdf._data), name, col) - return outdf - def label_encoding( self, column, prefix, cats, prefix_sep="_", dtype=None, na_sentinel=-1 ): diff --git a/python/cudf/cudf/core/reshape.py b/python/cudf/cudf/core/reshape.py index b2fac7a6140..1733a6c0b9a 100644 --- a/python/cudf/cudf/core/reshape.py +++ b/python/cudf/cudf/core/reshape.py @@ -600,18 +600,18 @@ def get_dummies( df : array-like, Series, or DataFrame Data of which to get dummy indicators. prefix : str, dict, or sequence, optional - prefix to append. Either a str (to apply a constant prefix), dict + Prefix to append. Either a str (to apply a constant prefix), dict mapping column names to prefixes, or sequence of prefixes to apply with the same length as the number of columns. If not supplied, defaults to the empty string prefix_sep : str, dict, or sequence, optional, default '_' - separator to use when appending prefixes + Separator to use when appending prefixes dummy_na : boolean, optional Add a column to indicate Nones, if False Nones are ignored. cats : dict, optional - dictionary mapping column names to sequences of integers representing - that column's category. See `cudf.DataFrame.one_hot_encoding` for more - information. if not supplied, it will be computed + Dictionary mapping column names to sequences of values representing + that column's category. If not supplied, it is computed as the unique + values of the column. sparse : boolean, optional Right now this is NON-FUNCTIONAL argument in rapids. drop_first : boolean, optional @@ -621,7 +621,7 @@ def get_dummies( columns. Note this is different from pandas default behavior, which encodes all columns with dtype object or categorical dtype : str, optional - output dtype, default 'uint8' + Output dtype, default 'uint8' Examples -------- diff --git a/python/cudf/cudf/core/series.py b/python/cudf/cudf/core/series.py index fb86cf85c4c..178c40b3cd8 100644 --- a/python/cudf/cudf/core/series.py +++ b/python/cudf/cudf/core/series.py @@ -2264,83 +2264,6 @@ def reverse(self): {self.name: self._column[rinds]}, self.index._values[rinds] ) - def one_hot_encoding(self, cats, dtype="float64"): - """Perform one-hot-encoding - - Parameters - ---------- - cats : sequence of values - values representing each category. - dtype : numpy.dtype - specifies the output dtype. - - Returns - ------- - Sequence - A sequence of new series for each category. Its length is - determined by the length of ``cats``. - - Examples - -------- - >>> import cudf - >>> s = cudf.Series(['a', 'b', 'c', 'a']) - >>> s - 0 a - 1 b - 2 c - 3 a - dtype: object - >>> s.one_hot_encoding(['a', 'c', 'b']) - [0 1.0 - 1 0.0 - 2 0.0 - 3 1.0 - dtype: float64, 0 0.0 - 1 0.0 - 2 1.0 - 3 0.0 - dtype: float64, 0 0.0 - 1 1.0 - 2 0.0 - 3 0.0 - dtype: float64] - """ - - warnings.warn( - "Series.one_hot_encoding is deprecated and will be removed in " - "future, use `get_dummies` instead.", - FutureWarning, - ) - - if hasattr(cats, "to_arrow"): - cats = cats.to_pandas() - else: - cats = pd.Series(cats, dtype="object") - dtype = cudf.dtype(dtype) - - try: - cats_col = as_column(cats, nan_as_null=False, dtype=self.dtype) - except TypeError: - raise ValueError("Cannot convert `cats` as cudf column.") - - if self._column.size * cats_col.size >= np.iinfo("int32").max: - raise ValueError( - "Size limitation exceeded: series.size * category.size < " - "np.iinfo('int32').max. Consider reducing size of category" - ) - - res = libcudf.transform.one_hot_encode(self._column, cats_col) - if dtype.type == np.bool_: - return [ - Series._from_data({None: x}, index=self._index) - for x in list(res.values()) - ] - else: - return [ - Series._from_data({None: x.astype(dtype)}, index=self._index) - for x in list(res.values()) - ] - def label_encoding(self, cats, dtype=None, na_sentinel=-1): """Perform label encoding. diff --git a/python/cudf/cudf/tests/test_onehot.py b/python/cudf/cudf/tests/test_onehot.py index f2a20a73b63..2b0422ffecb 100644 --- a/python/cudf/cudf/tests/test_onehot.py +++ b/python/cudf/cudf/tests/test_onehot.py @@ -7,108 +7,23 @@ import pytest import cudf -from cudf import DataFrame, Index, Series +from cudf import DataFrame from cudf.testing import _utils as utils -def test_onehot_simple(): - np.random.seed(0) - df = DataFrame() - # Populate with data [0, 10) - df["vals"] = np.arange(10, dtype=np.int32) - # One Hot (Series) - for i, col in enumerate(df["vals"].one_hot_encoding(list(range(10)))): - arr = col.to_numpy() - # Verify 1 in the right position - np.testing.assert_equal(arr[i], 1) - # Every other slots are 0s - np.testing.assert_equal(arr[:i], 0) - np.testing.assert_equal(arr[i + 1 :], 0) - # One Hot (DataFrame) - df2 = df.one_hot_encoding( - column="vals", prefix="vals", cats=list(range(10)) - ) - assert df2.columns[0] == "vals" - for i in range(1, len(df2.columns)): - assert df2.columns[i] == "vals_%s" % (i - 1) - got = df2[df2.columns[1:]].values_host - expect = np.identity(got.shape[0]) - np.testing.assert_equal(got, expect) - - -def test_onehot_random(): - df = DataFrame() - low = 10 - high = 17 - size = 10 - df["src"] = src = np.random.randint(low=low, high=high, size=size) - df2 = df.one_hot_encoding( - column="src", prefix="out_", cats=tuple(range(10, 17)) - ) - mat = df2[df2.columns[1:]].values_host - - for val in range(low, high): - colidx = val - low - arr = mat[:, colidx] - mask = src == val - np.testing.assert_equal(arr, mask) - - -def test_onehot_masked(): - np.random.seed(0) - high = 5 - size = 100 - arr = np.random.randint(low=0, high=high, size=size) - bitmask = utils.random_bitmask(size) - bytemask = np.asarray( - utils.expand_bits_to_bytes(bitmask)[:size], dtype=np.bool_ - ) - arr[~bytemask] = -1 - - df = DataFrame() - df["a"] = Series(arr).set_mask(bitmask) - - out = df.one_hot_encoding( - "a", cats=list(range(high)), prefix="a", dtype=np.int32 - ) - - assert tuple(out.columns) == ("a", "a_0", "a_1", "a_2", "a_3", "a_4") - np.testing.assert_array_equal((out["a_0"] == 1).to_numpy(), arr == 0) - np.testing.assert_array_equal((out["a_1"] == 1).to_numpy(), arr == 1) - np.testing.assert_array_equal((out["a_2"] == 1).to_numpy(), arr == 2) - np.testing.assert_array_equal((out["a_3"] == 1).to_numpy(), arr == 3) - np.testing.assert_array_equal((out["a_4"] == 1).to_numpy(), arr == 4) - - -def test_onehot_generic_index(): - np.random.seed(0) - size = 33 - indices = np.random.randint(low=0, high=100, size=size) - df = DataFrame() - values = np.random.randint(low=0, high=4, size=size) - df["fo"] = Series(values, index=Index(indices)) - out = df.one_hot_encoding( - "fo", cats=df.fo.unique(), prefix="fo", dtype=np.int32 - ) - assert set(out.columns) == {"fo", "fo_0", "fo_1", "fo_2", "fo_3"} - np.testing.assert_array_equal(values == 0, out.fo_0.to_numpy()) - np.testing.assert_array_equal(values == 1, out.fo_1.to_numpy()) - np.testing.assert_array_equal(values == 2, out.fo_2.to_numpy()) - np.testing.assert_array_equal(values == 3, out.fo_3.to_numpy()) - - @pytest.mark.parametrize( - "data", + "data, index", [ - np.arange(10), - ["abc", "zyx", "pppp"], - [], - pd.Series(["cudf", "hello", "pandas"] * 10, dtype="category"), + (np.arange(10), None), + (["abc", "zyx", "pppp"], None), + ([], None), + (pd.Series(["cudf", "hello", "pandas"] * 10, dtype="category"), None), + (range(10), [1, 2, 3, 4, 5] * 2), ], ) -def test_get_dummies(data): - gdf = DataFrame({"x": data}) - pdf = pd.DataFrame({"x": data}) +def test_get_dummies(data, index): + gdf = DataFrame({"x": data}, index=index) + pdf = pd.DataFrame({"x": data}, index=index) encoded_expected = pd.get_dummies(pdf, prefix="test") encoded_actual = cudf.get_dummies(gdf, prefix="test") From a61fc558531693f68e18ad29dc0b73610c5d1c70 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 6 Jan 2022 10:44:09 -0600 Subject: [PATCH 2/9] Minor cleanup of unused Python functions (#9974) This PR just removes some unused internal functions and inlines some single-use functions that were defined at the wrong levels of the class hierarchy (largely `Frame` internal methods that were exclusively called in a single `DataFrame` method). Authors: - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Ashwin Srinath (https://github.com/shwina) - Bradley Dice (https://github.com/bdice) - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/9974 --- python/cudf/cudf/core/dataframe.py | 47 ++---------- python/cudf/cudf/core/frame.py | 91 ++++++---------------- python/cudf/cudf/core/indexed_frame.py | 64 ++++++++++++++++ python/cudf/cudf/core/series.py | 39 ---------- python/cudf/cudf/tests/test_dataframe.py | 97 ------------------------ 5 files changed, 93 insertions(+), 245 deletions(-) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 3366a0af4ba..197011e629d 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -4078,45 +4078,6 @@ def apply_chunks( tpb=tpb, ) - def hash_values(self, method="murmur3"): - """Compute the hash of values in each row. - - Parameters - ---------- - method : {'murmur3', 'md5'}, default 'murmur3' - Hash function to use: - * murmur3: MurmurHash3 hash function. - * md5: MD5 hash function. - - Returns - ------- - Series - A Series with hash values. - - Examples - -------- - >>> import cudf - >>> df = cudf.DataFrame({"a": [10, 120, 30], "b": [0.0, 0.25, 0.50]}) - >>> df - a b - 0 10 0.00 - 1 120 0.25 - 2 30 0.50 - >>> df.hash_values(method="murmur3") - 0 -330519225 - 1 -397962448 - 2 -1345834934 - dtype: int32 - >>> df.hash_values(method="md5") - 0 57ce879751b5169c525907d5c563fae1 - 1 948d6221a7c4963d4be411bcead7e32b - 2 fe061786ea286a515b772d91b0dfcd70 - dtype: object - """ - return Series._from_data( - {None: self._hash(method=method)}, index=self.index - ) - def partition_by_hash(self, columns, nparts, keep_index=True): """Partition the dataframe by the hashed value of data in *columns*. @@ -4140,7 +4101,13 @@ def partition_by_hash(self, columns, nparts, keep_index=True): else self._index._num_columns ) key_indices = [self._data.names.index(k) + idx for k in columns] - outdf, offsets = self._hash_partition(key_indices, nparts, keep_index) + + output_data, output_index, offsets = libcudf.hash.hash_partition( + self, key_indices, nparts, keep_index + ) + outdf = self.__class__._from_data(output_data, output_index) + outdf._copy_type_metadata(self, include_index=keep_index) + # Slice into partition return [outdf[s:e] for s, e in zip(offsets, offsets[1:] + [None])] diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index bae15c5e9fd..539408b6afb 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -83,13 +83,6 @@ def __init__(self, data=None, index=None): def _num_columns(self) -> int: return len(self._data) - @property - def _num_indices(self) -> int: - if self._index is None: - return 0 - else: - return len(self._index_names) - @property def _num_rows(self) -> int: if self._index is not None: @@ -269,15 +262,6 @@ def shape(self): """Returns a tuple representing the dimensionality of the DataFrame.""" return self._num_rows, self._num_columns - @property - def _is_homogeneous(self): - # make sure that the dataframe has columns - if not self._data.columns: - return True - - first_type = self._data.columns[0].dtype.name - return all(x.dtype.name == first_type for x in self._data.columns) - @property def empty(self): """ @@ -580,19 +564,6 @@ def _gather( result._copy_type_metadata(self) return result - def _hash(self, method): - return libcudf.hash.hash(self, method) - - def _hash_partition( - self, columns_to_hash, num_partitions, keep_index=True - ): - output_data, output_index, offsets = libcudf.hash.hash_partition( - self, columns_to_hash, num_partitions, keep_index - ) - output = self.__class__._from_data(output_data, output_index) - output._copy_type_metadata(self, include_index=keep_index) - return output, offsets - def _as_column(self): """ _as_column : Converts a single columned Frame to Column @@ -1009,30 +980,6 @@ def mask(self, cond, other=None, inplace=False): return self.where(cond=~cond, other=other, inplace=inplace) - def _partition(self, scatter_map, npartitions, keep_index=True): - - data, index, output_offsets = libcudf.partitioning.partition( - self, scatter_map, npartitions, keep_index - ) - partitioned = self.__class__._from_data(data, index) - - # due to the split limitation mentioned - # here: https://github.com/rapidsai/cudf/issues/4607 - # we need to remove first & last elements in offsets. - # TODO: Remove this after the above issue is fixed. - output_offsets = output_offsets[1:-1] - - result = partitioned._split(output_offsets, keep_index=keep_index) - - for frame in result: - frame._copy_type_metadata(self, include_index=keep_index) - - if npartitions: - for _ in range(npartitions - len(result)): - result.append(self._empty_like(keep_index)) - - return result - def pipe(self, func, *args, **kwargs): """ Apply ``func(self, *args, **kwargs)``. @@ -1139,9 +1086,29 @@ def scatter_by_map( f"ERROR: map_size must be >= {count} (got {map_size})." ) - tables = self._partition(map_index, map_size, keep_index) + data, index, output_offsets = libcudf.partitioning.partition( + self, map_index, map_size, keep_index + ) + partitioned = self.__class__._from_data(data, index) - return tables + # due to the split limitation mentioned + # here: https://github.com/rapidsai/cudf/issues/4607 + # we need to remove first & last elements in offsets. + # TODO: Remove this after the above issue is fixed. + output_offsets = output_offsets[1:-1] + + result = partitioned._split(output_offsets, keep_index=keep_index) + + for frame in result: + frame._copy_type_metadata(self, include_index=keep_index) + + if map_size: + result += [ + self._empty_like(keep_index) + for _ in range(map_size - len(result)) + ] + + return result def dropna( self, axis=0, how="any", thresh=None, subset=None, inplace=False @@ -1499,8 +1466,6 @@ def _apply_boolean_mask(self, boolean_mask): Applies boolean mask to each row of `self`, rows corresponding to `False` is dropped """ - boolean_mask = as_column(boolean_mask) - result = self.__class__._from_data( *libcudf.stream_compaction.apply_boolean_mask( self, as_column(boolean_mask) @@ -2503,18 +2468,6 @@ def _copy_type_metadata( return self - def _copy_interval_data(self, other, include_index=True): - for name, col, other_col in zip( - self._data.keys(), self._data.values(), other._data.values() - ): - if isinstance(other_col, cudf.core.column.IntervalColumn): - self._data[name] = cudf.core.column.IntervalColumn(col) - - def _postprocess_columns(self, other, include_index=True): - self._copy_categories(other, include_index=include_index) - self._copy_struct_names(other, include_index=include_index) - self._copy_interval_data(other, include_index=include_index) - def isnull(self): """ Identify missing values. diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index 4be35d960ee..ecacb1ff326 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -445,6 +445,70 @@ def sort_index( out = out.reset_index(drop=True) return self._mimic_inplace(out, inplace=inplace) + def hash_values(self, method="murmur3"): + """Compute the hash of values in this column. + + Parameters + ---------- + method : {'murmur3', 'md5'}, default 'murmur3' + Hash function to use: + * murmur3: MurmurHash3 hash function. + * md5: MD5 hash function. + + Returns + ------- + Series + A Series with hash values. + + Examples + -------- + **Series** + + >>> import cudf + >>> series = cudf.Series([10, 120, 30]) + >>> series + 0 10 + 1 120 + 2 30 + dtype: int64 + >>> series.hash_values(method="murmur3") + 0 -1930516747 + 1 422619251 + 2 -941520876 + dtype: int32 + >>> series.hash_values(method="md5") + 0 7be4bbacbfdb05fb3044e36c22b41e8b + 1 947ca8d2c5f0f27437f156cfbfab0969 + 2 d0580ef52d27c043c8e341fd5039b166 + dtype: object + + **DataFrame** + + >>> import cudf + >>> df = cudf.DataFrame({"a": [10, 120, 30], "b": [0.0, 0.25, 0.50]}) + >>> df + a b + 0 10 0.00 + 1 120 0.25 + 2 30 0.50 + >>> df.hash_values(method="murmur3") + 0 -330519225 + 1 -397962448 + 2 -1345834934 + dtype: int32 + >>> df.hash_values(method="md5") + 0 57ce879751b5169c525907d5c563fae1 + 1 948d6221a7c4963d4be411bcead7e32b + 2 fe061786ea286a515b772d91b0dfcd70 + dtype: object + """ + # Note that both Series and DataFrame return Series objects from this + # calculation, necessitating the unfortunate circular reference to the + # child class here. + return cudf.Series._from_data( + {None: libcudf.hash.hash(self, method)}, index=self.index + ) + def _gather( self, gather_map, keep_index=True, nullify=False, check_bounds=True ): diff --git a/python/cudf/cudf/core/series.py b/python/cudf/cudf/core/series.py index 178c40b3cd8..a0e359d1278 100644 --- a/python/cudf/cudf/core/series.py +++ b/python/cudf/cudf/core/series.py @@ -3043,45 +3043,6 @@ def value_counts( res = res / float(res._column.sum()) return res - def hash_values(self, method="murmur3"): - """Compute the hash of values in this column. - - Parameters - ---------- - method : {'murmur3', 'md5'}, default 'murmur3' - Hash function to use: - * murmur3: MurmurHash3 hash function. - * md5: MD5 hash function. - - Returns - ------- - Series - A Series with hash values. - - Examples - -------- - >>> import cudf - >>> series = cudf.Series([10, 120, 30]) - >>> series - 0 10 - 1 120 - 2 30 - dtype: int64 - >>> series.hash_values(method="murmur3") - 0 -1930516747 - 1 422619251 - 2 -941520876 - dtype: int32 - >>> series.hash_values(method="md5") - 0 7be4bbacbfdb05fb3044e36c22b41e8b - 1 947ca8d2c5f0f27437f156cfbfab0969 - 2 d0580ef52d27c043c8e341fd5039b166 - dtype: object - """ - return Series._from_data( - {None: self._hash(method=method)}, index=self.index - ) - def quantile( self, q=0.5, interpolation="linear", exact=True, quant_index=True ): diff --git a/python/cudf/cudf/tests/test_dataframe.py b/python/cudf/cudf/tests/test_dataframe.py index f42920b7c50..73f9cb858e1 100644 --- a/python/cudf/cudf/tests/test_dataframe.py +++ b/python/cudf/cudf/tests/test_dataframe.py @@ -8696,103 +8696,6 @@ def test_dataframe_init_from_series(data, columns, index): ) -@pytest.mark.parametrize( - "data, expected", - [ - ({"a": [1, 2, 3, 4], "b": [5, 6, 7, 8], "c": [1.2, 1, 2, 3]}, False), - ({"a": [1, 2, 3], "b": [4, 5, 6], "c": [7, 8, 9]}, True), - ({"a": ["a", "b", "c"], "b": [4, 5, 6], "c": [7, 8, 9]}, False), - ({"a": [True, False, False], "b": [False, False, True]}, True), - ({"a": [True, False, False]}, True), - ({"a": [[1, 2], [3, 4]]}, True), - ({"a": [[1, 2], [3, 4]], "b": ["a", "b"]}, False), - ({"a": [{"c": 5}, {"e": 5}], "b": [{"c": 5}, {"g": 7}]}, True), - ({}, True), - ], -) -def test_is_homogeneous_dataframe(data, expected): - actual = cudf.DataFrame(data)._is_homogeneous - - assert actual == expected - - -@pytest.mark.parametrize( - "data, indexes, expected", - [ - ( - {"a": [1, 2, 3, 4], "b": [5, 6, 7, 8], "c": [1.2, 1, 2, 3]}, - ["a", "b"], - True, - ), - ( - { - "a": [1, 2, 3, 4], - "b": [5, 6, 7, 8], - "c": [1.2, 1, 2, 3], - "d": ["hello", "world", "cudf", "rapids"], - }, - ["a", "b"], - False, - ), - ( - { - "a": ["a", "b", "c"], - "b": [4, 5, 6], - "c": [7, 8, 9], - "d": [1, 2, 3], - }, - ["a", "b"], - True, - ), - ], -) -def test_is_homogeneous_multiIndex_dataframe(data, indexes, expected): - test_dataframe = cudf.DataFrame(data).set_index(indexes) - actual = cudf.DataFrame(test_dataframe)._is_homogeneous - - assert actual == expected - - -@pytest.mark.parametrize( - "data, expected", [([1, 2, 3, 4], True), ([True, False], True)] -) -def test_is_homogeneous_series(data, expected): - actual = cudf.Series(data)._is_homogeneous - - assert actual == expected - - -@pytest.mark.parametrize( - "levels, codes, expected", - [ - ( - [["lama", "cow", "falcon"], ["speed", "weight", "length"]], - [[0, 0, 0, 1, 1, 1, 2, 2, 2], [0, 1, 2, 0, 1, 2, 0, 1, 2]], - True, - ), - ( - [[1, 2, 3], [True, False, True]], - [[0, 0, 0, 1, 1, 1, 2, 2, 2], [0, 1, 2, 0, 1, 2, 0, 1, 2]], - False, - ), - ], -) -def test_is_homogeneous_multiIndex(levels, codes, expected): - actual = cudf.MultiIndex(levels=levels, codes=codes)._is_homogeneous - - assert actual == expected - - -@pytest.mark.parametrize( - "data, expected", - [([1, 2, 3], True), (["Hello", "World"], True), ([True, False], True)], -) -def test_is_homogeneous_index(data, expected): - actual = cudf.Index(data)._is_homogeneous - - assert actual == expected - - def test_frame_series_where(): gdf = cudf.DataFrame( {"a": [1.0, 2.0, None, 3.0, None], "b": [None, 10.0, 11.0, None, 23.0]} From 23603d16804f13f16ed4cb2c45831836e080017e Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Thu, 6 Jan 2022 12:19:44 -0500 Subject: [PATCH 3/9] custreamz oauth callback for kafka (librdkafka) (#9486) Previously it was impossible to use custreamz with oauth enabled Kafka brokers. This PR adds a feature so that the user can supply a Python function which is invoked to get the oauth token, from a http endpoint for example, and then supply that token to librdkafka to be used in both the initial connection to kafka and also subsequently as the token becomes stale. This closes #9410 Authors: - Jeremy Dyer (https://github.com/jdye64) - AJ Schmidt (https://github.com/ajschmidt8) Approvers: - Robert Maynard (https://github.com/robertmaynard) - Vyas Ramasubramani (https://github.com/vyasr) - Ram (Ramakrishna Prabhu) (https://github.com/rgsl888prabhu) - AJ Schmidt (https://github.com/ajschmidt8) URL: https://github.com/rapidsai/cudf/pull/9486 --- ci/gpu/build.sh | 2 +- conda/recipes/cudf_kafka/build.sh | 2 +- conda/recipes/cudf_kafka/meta.yaml | 15 ++-- conda/recipes/custreamz/build.sh | 2 +- conda/recipes/custreamz/meta.yaml | 18 ++--- conda/recipes/libcudf_kafka/build.sh | 2 +- conda/recipes/libcudf_kafka/meta.yaml | 4 +- cpp/libcudf_kafka/CMakeLists.txt | 13 +++- .../cmake/thirdparty/get_cudf.cmake | 2 +- .../cmake/thirdparty/get_rdkafka.cmake | 2 +- .../include/cudf_kafka/kafka_callback.hpp | 71 ++++++++++++++++++ .../include/cudf_kafka/kafka_consumer.hpp | 32 ++++++-- cpp/libcudf_kafka/src/kafka_callback.cpp | 48 ++++++++++++ cpp/libcudf_kafka/src/kafka_consumer.cpp | 46 +++++++++--- cpp/libcudf_kafka/tests/CMakeLists.txt | 7 +- .../tests/kafka_consumer_tests.cpp | 36 ++++++--- .../cudf_kafka/cudf_kafka/_lib/.kafka.pxd.swo | Bin 0 -> 12288 bytes python/cudf_kafka/cudf_kafka/_lib/kafka.pxd | 12 ++- python/cudf_kafka/cudf_kafka/_lib/kafka.pyx | 36 +++++++-- python/custreamz/custreamz/kafka.py | 17 ++--- .../custreamz/custreamz/tests/test_kafka.py | 5 +- 21 files changed, 295 insertions(+), 77 deletions(-) create mode 100644 cpp/libcudf_kafka/include/cudf_kafka/kafka_callback.hpp create mode 100644 cpp/libcudf_kafka/src/kafka_callback.cpp create mode 100644 python/cudf_kafka/cudf_kafka/_lib/.kafka.pxd.swo diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index a557a2ef066..4ac2fe79bf6 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -89,7 +89,7 @@ gpuci_mamba_retry install -y \ "ucx-py=${UCX_PY_VERSION}" # https://docs.rapids.ai/maintainers/depmgmt/ -# gpuci_mamba_retry remove --force rapids-build-env rapids-notebook-env +# gpuci_conda_retry remove --force rapids-build-env rapids-notebook-env # gpuci_mamba_retry install -y "your-pkg=1.0.0" diff --git a/conda/recipes/cudf_kafka/build.sh b/conda/recipes/cudf_kafka/build.sh index 3db559c144d..5d8720f1c98 100644 --- a/conda/recipes/cudf_kafka/build.sh +++ b/conda/recipes/cudf_kafka/build.sh @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # This assumes the script is executed from the root of the repo directory ./build.sh -v cudf_kafka diff --git a/conda/recipes/cudf_kafka/meta.yaml b/conda/recipes/cudf_kafka/meta.yaml index e450d306cbe..d434e53c9b1 100644 --- a/conda/recipes/cudf_kafka/meta.yaml +++ b/conda/recipes/cudf_kafka/meta.yaml @@ -1,9 +1,9 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. {% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %} {% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %} -{% set py_version=environ.get('CONDA_PY', 36) %} -{% set cuda_version='.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %} +{% set cuda_version = '.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %} +{% set py_version = environ.get('python', '3.8') %} package: name: cudf_kafka @@ -14,7 +14,7 @@ source: build: number: {{ GIT_DESCRIBE_NUMBER }} - string: py{{ py_version }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }} + string: py{{ py_version.replace('.', '') }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }} script_env: - CC - CXX @@ -26,14 +26,15 @@ requirements: build: - cmake >=3.20.1 host: - - python + - python {{ py_version }} - cython >=0.29,<0.30 - - setuptools - cudf {{ version }} - libcudf_kafka {{ version }} + - setuptools run: + - python {{ py_version }} - libcudf_kafka {{ version }} - - python-confluent-kafka + - python-confluent-kafka >=1.7.0,<1.8.0a0=py{{ py_version.replace('.', '') }}* - cudf {{ version }} test: # [linux64] diff --git a/conda/recipes/custreamz/build.sh b/conda/recipes/custreamz/build.sh index 6ce9e4f21a9..88fccf90c69 100644 --- a/conda/recipes/custreamz/build.sh +++ b/conda/recipes/custreamz/build.sh @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # This assumes the script is executed from the root of the repo directory ./build.sh -v custreamz diff --git a/conda/recipes/custreamz/meta.yaml b/conda/recipes/custreamz/meta.yaml index a8b096d4892..73f4727b70b 100644 --- a/conda/recipes/custreamz/meta.yaml +++ b/conda/recipes/custreamz/meta.yaml @@ -1,9 +1,9 @@ -# Copyright (c) 2018-2019, NVIDIA CORPORATION. +# Copyright (c) 2018-2022, NVIDIA CORPORATION. {% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %} {% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %} -{% set py_version=environ.get('CONDA_PY', 36) %} -{% set cuda_version='.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %} +{% set cuda_version = '.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %} +{% set py_version = environ.get('python', '3.8') %} package: name: custreamz @@ -14,7 +14,7 @@ source: build: number: {{ GIT_DESCRIBE_NUMBER }} - string: py{{ py_version }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }} + string: py{{ py_version.replace('.', '') }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }} script_env: - VERSION_SUFFIX - PARALLEL_LEVEL @@ -24,16 +24,16 @@ build: requirements: host: - - python - - python-confluent-kafka + - python {{ py_version }} + - python-confluent-kafka >=1.7.0,<1.8.0a0=py{{ py_version.replace('.', '') }}* - cudf_kafka {{ version }} run: - - python - - streamz + - python {{ py_version }} + - streamz - cudf {{ version }} - dask>=2021.11.1,<=2021.11.2 - distributed>=2021.11.1,<=2021.11.2 - - python-confluent-kafka + - python-confluent-kafka >=1.7.0,<1.8.0a0=py{{ py_version.replace('.', '') }}* - cudf_kafka {{ version }} test: # [linux64] diff --git a/conda/recipes/libcudf_kafka/build.sh b/conda/recipes/libcudf_kafka/build.sh index cbe4584cb63..b656f55a64e 100644 --- a/conda/recipes/libcudf_kafka/build.sh +++ b/conda/recipes/libcudf_kafka/build.sh @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. if [[ -z "$PROJECT_FLASH" || "$PROJECT_FLASH" == "0" ]]; then # This assumes the script is executed from the root of the repo directory diff --git a/conda/recipes/libcudf_kafka/meta.yaml b/conda/recipes/libcudf_kafka/meta.yaml index 6b15890e7c7..0b274f3a41d 100644 --- a/conda/recipes/libcudf_kafka/meta.yaml +++ b/conda/recipes/libcudf_kafka/meta.yaml @@ -1,4 +1,4 @@ -# Copyright (c) 2018-2021, NVIDIA CORPORATION. +# Copyright (c) 2018-2022, NVIDIA CORPORATION. {% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %} {% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %} @@ -26,7 +26,7 @@ requirements: - cmake >=3.20.1 host: - libcudf {{version}} - - librdkafka >=1.6.0,<1.7.0a0 + - librdkafka >=1.7.0,<1.8.0a0 run: - {{ pin_compatible('librdkafka', max_pin='x.x') }} #TODO: librdkafka should be automatically included here by run_exports but is not diff --git a/cpp/libcudf_kafka/CMakeLists.txt b/cpp/libcudf_kafka/CMakeLists.txt index d0874b57c2d..e6abba207d9 100644 --- a/cpp/libcudf_kafka/CMakeLists.txt +++ b/cpp/libcudf_kafka/CMakeLists.txt @@ -1,5 +1,5 @@ # ============================================================================= -# Copyright (c) 2018-2021, NVIDIA CORPORATION. +# Copyright (c) 2018-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at @@ -29,6 +29,10 @@ project( # Set a default build type if none was specified rapids_cmake_build_type(Release) +# ################################################################################################## +# * conda environment ----------------------------------------------------------------------------- +rapids_cmake_support_conda_env(conda_env MODIFY_PREFIX_PATH) + # ################################################################################################## # * Build options option(BUILD_TESTS "Build tests for libcudf_kafka" ON) @@ -55,7 +59,7 @@ endif() # ################################################################################################## # * library target -------------------------------------------------------------------------------- -add_library(cudf_kafka SHARED src/kafka_consumer.cpp) +add_library(cudf_kafka SHARED src/kafka_consumer.cpp src/kafka_callback.cpp) # ################################################################################################## # * include paths --------------------------------------------------------------------------------- @@ -68,6 +72,11 @@ target_include_directories( # * library paths --------------------------------------------------------------------------------- target_link_libraries(cudf_kafka PUBLIC cudf::cudf RDKAFKA::RDKAFKA) +# Add Conda library, and include paths if specified +if(TARGET conda_env) + target_link_libraries(cudf_kafka PRIVATE conda_env) +endif() + set_target_properties( cudf_kafka PROPERTIES BUILD_RPATH "\$ORIGIN" INSTALL_RPATH "\$ORIGIN" # set target compile options diff --git a/cpp/libcudf_kafka/cmake/thirdparty/get_cudf.cmake b/cpp/libcudf_kafka/cmake/thirdparty/get_cudf.cmake index 1e04d40a7d5..aa4c5b60e7a 100644 --- a/cpp/libcudf_kafka/cmake/thirdparty/get_cudf.cmake +++ b/cpp/libcudf_kafka/cmake/thirdparty/get_cudf.cmake @@ -1,5 +1,5 @@ # ============================================================================= -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at diff --git a/cpp/libcudf_kafka/cmake/thirdparty/get_rdkafka.cmake b/cpp/libcudf_kafka/cmake/thirdparty/get_rdkafka.cmake index 3b3342cb297..5c3c9f01f17 100644 --- a/cpp/libcudf_kafka/cmake/thirdparty/get_rdkafka.cmake +++ b/cpp/libcudf_kafka/cmake/thirdparty/get_rdkafka.cmake @@ -1,5 +1,5 @@ # ============================================================================= -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at diff --git a/cpp/libcudf_kafka/include/cudf_kafka/kafka_callback.hpp b/cpp/libcudf_kafka/include/cudf_kafka/kafka_callback.hpp new file mode 100644 index 00000000000..a4ff18054b1 --- /dev/null +++ b/cpp/libcudf_kafka/include/cudf_kafka/kafka_callback.hpp @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include + +#include +#include +#include + +namespace cudf { +namespace io { +namespace external { +namespace kafka { + +/** + * @brief Python Callback function wrapper type used for Kafka OAuth events + * + * The KafkaConsumer calls the `kafka_oauth_callback_wrapper_type` when the existing + * oauth token is considered expired by the KafkaConsumer. Typically that + * means this will be invoked a single time when the KafkaConsumer is created + * to get the initial token and then intermediately as the token becomes + * expired. + * + * The callback function signature is: + * `std::map kafka_oauth_callback_wrapper_type(void*)` + * + * The callback function returns a std::map, + * where the std::map consists of the Oauth token and its + * linux epoch expiration time. Generally the token and expiration + * time is retrieved from an external service by the callback. + * Ex: [token, token_expiration_in_epoch] + */ +using kafka_oauth_callback_wrapper_type = std::map (*)(void*); +using python_callable_type = void*; + +/** + * @brief Callback to retrieve OAuth token from external source. Invoked when + * token refresh is required. + */ +class python_oauth_refresh_callback : public RdKafka::OAuthBearerTokenRefreshCb { + public: + python_oauth_refresh_callback(kafka_oauth_callback_wrapper_type callback_wrapper, + python_callable_type python_callable); + + void oauthbearer_token_refresh_cb(RdKafka::Handle* handle, const std::string& oauthbearer_config); + + private: + kafka_oauth_callback_wrapper_type callback_wrapper_; + python_callable_type python_callable_; +}; + +} // namespace kafka +} // namespace external +} // namespace io +} // namespace cudf diff --git a/cpp/libcudf_kafka/include/cudf_kafka/kafka_consumer.hpp b/cpp/libcudf_kafka/include/cudf_kafka/kafka_consumer.hpp index 464d1cd71b1..c65774d2e1a 100644 --- a/cpp/libcudf_kafka/include/cudf_kafka/kafka_consumer.hpp +++ b/cpp/libcudf_kafka/include/cudf_kafka/kafka_consumer.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,10 +15,14 @@ */ #pragma once -#include -#include +#include "kafka_callback.hpp" + #include + #include + +#include +#include #include #include #include @@ -48,8 +52,15 @@ class kafka_consumer : public cudf::io::datasource { * * @param configs key/value pairs of librdkafka configurations that will be * passed to the librdkafka client + * @param python_callable `python_callable_type` pointer to a Python functools.partial object + * @param callable_wrapper `kafka_oauth_callback_wrapper_type` Cython wrapper that will + * be used to invoke the `python_callable`. This wrapper serves the purpose + * of preventing us from having to link against the Python development library + * in libcudf_kafka. */ - kafka_consumer(std::map const& configs); + kafka_consumer(std::map configs, + python_callable_type python_callable, + kafka_oauth_callback_wrapper_type callable_wrapper); /** * @brief Instantiate a Kafka consumer object. Documentation for librdkafka configurations can be @@ -57,6 +68,11 @@ class kafka_consumer : public cudf::io::datasource { * * @param configs key/value pairs of librdkafka configurations that will be * passed to the librdkafka client + * @param python_callable `python_callable_type` pointer to a Python functools.partial object + * @param callable_wrapper `kafka_oauth_callback_wrapper_type` Cython wrapper that will + * be used to invoke the `python_callable`. This wrapper serves the purpose + * of preventing us from having to link against the Python development library + * in libcudf_kafka. * @param topic_name name of the Kafka topic to consume from * @param partition partition index to consume from between `0` and `TOPIC_NUM_PARTITIONS - 1` * inclusive @@ -66,7 +82,9 @@ class kafka_consumer : public cudf::io::datasource { * before batch_timeout, a smaller subset will be returned * @param delimiter optional delimiter to insert into the output between kafka messages, Ex: "\n" */ - kafka_consumer(std::map const& configs, + kafka_consumer(std::map configs, + python_callable_type python_callable, + kafka_oauth_callback_wrapper_type callable_wrapper, std::string const& topic_name, int partition, int64_t start_offset, @@ -178,6 +196,10 @@ class kafka_consumer : public cudf::io::datasource { std::unique_ptr kafka_conf; // RDKafka configuration object std::unique_ptr consumer; + std::map configs; + python_callable_type python_callable_; + kafka_oauth_callback_wrapper_type callable_wrapper_; + std::string topic_name; int partition; int64_t start_offset; diff --git a/cpp/libcudf_kafka/src/kafka_callback.cpp b/cpp/libcudf_kafka/src/kafka_callback.cpp new file mode 100644 index 00000000000..6b98747c145 --- /dev/null +++ b/cpp/libcudf_kafka/src/kafka_callback.cpp @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "cudf_kafka/kafka_callback.hpp" + +#include + +namespace cudf { +namespace io { +namespace external { +namespace kafka { + +python_oauth_refresh_callback::python_oauth_refresh_callback( + kafka_oauth_callback_wrapper_type callback_wrapper, python_callable_type python_callable) + : callback_wrapper_(callback_wrapper), python_callable_(python_callable){}; + +void python_oauth_refresh_callback::oauthbearer_token_refresh_cb( + RdKafka::Handle* handle, std::string const& oauthbearer_config) +{ + std::map resp = callback_wrapper_(python_callable_); + + // Build parameters to pass to librdkafka + std::string token = resp["token"]; + int64_t token_lifetime_ms = std::stoll(resp["token_expiration_in_epoch"]); + std::list extensions; // currently not supported + std::string errstr; + CUDF_EXPECTS( + RdKafka::ErrorCode::ERR_NO_ERROR == + handle->oauthbearer_set_token(token, token_lifetime_ms, "kafka", extensions, errstr), + "Error occurred while setting the oauthbearer token"); +} + +} // namespace kafka +} // namespace external +} // namespace io +} // namespace cudf diff --git a/cpp/libcudf_kafka/src/kafka_consumer.cpp b/cpp/libcudf_kafka/src/kafka_consumer.cpp index 4f7cdba632e..49e89a56e60 100644 --- a/cpp/libcudf_kafka/src/kafka_consumer.cpp +++ b/cpp/libcudf_kafka/src/kafka_consumer.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,10 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #include "cudf_kafka/kafka_consumer.hpp" -#include + #include + +#include #include namespace cudf { @@ -24,8 +25,13 @@ namespace io { namespace external { namespace kafka { -kafka_consumer::kafka_consumer(std::map const& configs) - : kafka_conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)) +kafka_consumer::kafka_consumer(std::map configs, + python_callable_type python_callable, + kafka_oauth_callback_wrapper_type callable_wrapper) + : configs(configs), + python_callable_(python_callable), + callable_wrapper_(callable_wrapper), + kafka_conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)) { for (auto const& key_value : configs) { std::string error_string; @@ -34,6 +40,14 @@ kafka_consumer::kafka_consumer(std::map const& configs "Invalid Kafka configuration"); } + if (python_callable_ != nullptr) { + std::string error_string; + python_oauth_refresh_callback cb(callable_wrapper_, python_callable_); + CUDF_EXPECTS(RdKafka::Conf::ConfResult::CONF_OK == + kafka_conf->set("oauthbearer_token_refresh_cb", &cb, error_string), + "Failed to set Kafka oauth callback"); + } + // Kafka 0.9 > requires group.id in the configuration std::string conf_val; CUDF_EXPECTS(RdKafka::Conf::ConfResult::CONF_OK == kafka_conf->get("group.id", conf_val), @@ -44,22 +58,26 @@ kafka_consumer::kafka_consumer(std::map const& configs RdKafka::KafkaConsumer::create(kafka_conf.get(), errstr)); } -kafka_consumer::kafka_consumer(std::map const& configs, +kafka_consumer::kafka_consumer(std::map configs, + python_callable_type python_callable, + kafka_oauth_callback_wrapper_type callback_wrapper, std::string const& topic_name, int partition, int64_t start_offset, int64_t end_offset, int batch_timeout, std::string const& delimiter) - : topic_name(topic_name), + : configs(configs), + python_callable_(python_callable), + callable_wrapper_(callback_wrapper), + topic_name(topic_name), partition(partition), start_offset(start_offset), end_offset(end_offset), batch_timeout(batch_timeout), - delimiter(delimiter) + delimiter(delimiter), + kafka_conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)) { - kafka_conf = std::unique_ptr(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); - for (auto const& key_value : configs) { std::string error_string; CUDF_EXPECTS(RdKafka::Conf::ConfResult::CONF_OK == @@ -67,6 +85,14 @@ kafka_consumer::kafka_consumer(std::map const& configs "Invalid Kafka configuration"); } + if (python_callable_ != nullptr) { + std::string error_string; + python_oauth_refresh_callback cb(callable_wrapper_, python_callable_); + CUDF_EXPECTS(RdKafka::Conf::ConfResult::CONF_OK == + kafka_conf->set("oauthbearer_token_refresh_cb", &cb, error_string), + "Failed to set Kafka oauth callback"); + } + // Kafka 0.9 > requires group.id in the configuration std::string conf_val; CUDF_EXPECTS(RdKafka::Conf::ConfResult::CONF_OK == kafka_conf->get("group.id", conf_val), diff --git a/cpp/libcudf_kafka/tests/CMakeLists.txt b/cpp/libcudf_kafka/tests/CMakeLists.txt index 3920758f3f2..db2131ba00c 100644 --- a/cpp/libcudf_kafka/tests/CMakeLists.txt +++ b/cpp/libcudf_kafka/tests/CMakeLists.txt @@ -1,5 +1,5 @@ # ============================================================================= -# Copyright (c) 2018-2021, NVIDIA CORPORATION. +# Copyright (c) 2018-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at @@ -23,8 +23,9 @@ function(ConfigureTest test_name) ${test_name} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "$" ) - target_link_libraries(${test_name} PRIVATE GTest::gmock_main GTest::gtest_main cudf_kafka) - + target_link_libraries( + ${test_name} PRIVATE GTest::gmock GTest::gmock_main GTest::gtest_main cudf_kafka + ) add_test(NAME ${test_name} COMMAND ${test_name}) endfunction() diff --git a/cpp/libcudf_kafka/tests/kafka_consumer_tests.cpp b/cpp/libcudf_kafka/tests/kafka_consumer_tests.cpp index ca4b70531db..613c2435f4d 100644 --- a/cpp/libcudf_kafka/tests/kafka_consumer_tests.cpp +++ b/cpp/libcudf_kafka/tests/kafka_consumer_tests.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "cudf_kafka/kafka_consumer.hpp" +#include #include #include #include @@ -32,25 +32,37 @@ TEST_F(KafkaDatasourceTest, MissingGroupID) { // group.id is a required configuration. std::map kafka_configs; - kafka_configs.insert({"bootstrap.servers", "localhost:9092"}); + kafka_configs["bootstrap.servers"] = "localhost:9092"; - EXPECT_THROW(kafka::kafka_consumer kc(kafka_configs, "csv-topic", 0, 0, 3, 5000, "\n"), - cudf::logic_error); + kafka::python_callable_type python_callable; + kafka::kafka_oauth_callback_wrapper_type callback_wrapper; + + EXPECT_THROW( + kafka::kafka_consumer kc( + kafka_configs, python_callable, callback_wrapper, "csv-topic", 0, 0, 3, 5000, "\n"), + cudf::logic_error); } TEST_F(KafkaDatasourceTest, InvalidConfigValues) { // Give a made up configuration value std::map kafka_configs; - kafka_configs.insert({"completely_made_up_config", "wrong"}); + kafka_configs["completely_made_up_config"] = "wrong"; - EXPECT_THROW(kafka::kafka_consumer kc(kafka_configs, "csv-topic", 0, 0, 3, 5000, "\n"), - cudf::logic_error); + kafka::python_callable_type python_callable; + kafka::kafka_oauth_callback_wrapper_type callback_wrapper; - kafka_configs.clear(); + EXPECT_THROW( + kafka::kafka_consumer kc( + kafka_configs, python_callable, callback_wrapper, "csv-topic", 0, 0, 3, 5000, "\n"), + cudf::logic_error); // Give a good config property with a bad value - kafka_configs.insert({"message.max.bytes", "this should be a number not text"}); - EXPECT_THROW(kafka::kafka_consumer kc(kafka_configs, "csv-topic", 0, 0, 3, 5000, "\n"), - cudf::logic_error); + kafka_configs.clear(); + kafka_configs["message.max.bytes"] = "this should be a number not text"; + + EXPECT_THROW( + kafka::kafka_consumer kc( + kafka_configs, python_callable, callback_wrapper, "csv-topic", 0, 0, 3, 5000, "\n"), + cudf::logic_error); } diff --git a/python/cudf_kafka/cudf_kafka/_lib/.kafka.pxd.swo b/python/cudf_kafka/cudf_kafka/_lib/.kafka.pxd.swo new file mode 100644 index 0000000000000000000000000000000000000000..624b60798ae94ece673e4028670bd4d559d7c663 GIT binary patch literal 12288 zcmeI2%ZnUE9LHzI-CdpT zn%N!j5fsFW2mKEc<0D2q=v7Y&9z2?eiYJd=a`6#JB!2r{-TRnbL=dTh5B;cL{i^C) zk6-spH>Xa|&e0?FDS~uAAwS%F!+dbh!z0&^6LPjS;C^_2NOJpD%Q7ccg})L=Q(D5^ zxWKEC!^s)m=eF=1?#fBlYqcl6f$R!5IyY9#_KK;V8n)G(jPBOGwbsUt^oPi5E1(t73TOqi0$KsBfL1^&pcVLEDImQO@)~+{ zN7Ac>1fPS?fDb0XuX_l& z2rhtEz{_9(%!9|kA#md!LVf|?fXm=*;DS@&MKB5~;P%~wTm_fFD(HhMcpUt+8*K*f zfakz2@HKS%3S0yiK&az~-~%uK3>*hz-~hM|t*?Qr;1YNboCk~G5V&zCA%B9);3KdE z&VYm9Tl`4#CU_RifKgBZf8fWO-_sv#wl&8n67yQTP5Z)XQD%#PR|9Uh$EoGY!_$VO z(sH=y$r>Rgl&B?4IF2PHZ*9?%Y)G?C4jZzxhPZb_h9@0| z%=?g$D5?~WraBX&{uDZsbUmD`EmN9-==qF~MZM3jBjsw;6&=e~3M6&4xNSiue9RP8 z7rIJY-83cZ8p&iYWJelvH&u3h9=>(*Wzy6_ZnSLNqYIE z3j90E58E>drDowq&~vSGJ#Ki?U&^*gC2z<#(*?x*jMV(rzZr-y|X{wI@#nX}Qudb(4hMZ4KSJ=H&v% zbFdgtH*QNbVKBL^j$2KXjAPYuEP60bO9InuKm$b&QtD88Sq{TWk<_GgOuJPG&w1C@ zTbH8sP*XK{Z)IGJ<(pb&p&fgR;hGNL)KWCh;t7xnvnRU-Gi|$RvX$IA`qJ580uR{^ z_p9n0Plp7VZL1S(K9@GEVgt>0Tvz!qS;k_ox4IS3QkHYJgsONh$WtQU!U($R( zF59EsVHr8_c#xUr9uoiX92&;+Ju{0hV~>eNGURE!@nyln;g0YJnU6fu6s=4zGr@4H zdD;DNf#w in libcudf_kafka +# we introduce this wrapper in Cython +cdef map[string, string] oauth_callback_wrapper(void *ctx): + return ((ctx))() + + cdef class KafkaDatasource(Datasource): def __cinit__(self, - map[string, string] kafka_configs, + object kafka_configs, string topic=b"", int32_t partition=-1, int64_t start_offset=0, int64_t end_offset=0, int32_t batch_timeout=10000, string delimiter=b"",): + + cdef map[string, string] configs + cdef void* python_callable = nullptr + cdef map[string, string] (*python_callable_wrapper)(void *) + + for key in kafka_configs: + if key == 'oauth_cb': + if callable(kafka_configs[key]): + python_callable = kafka_configs[key] + python_callable_wrapper = &oauth_callback_wrapper + else: + raise TypeError("'oauth_cb' configuration must \ + be a Python callable object") + else: + configs[key.encode()] = kafka_configs[key].encode() + if topic != b"" and partition != -1: self.c_datasource = \ - make_unique[kafka_consumer](kafka_configs, + make_unique[kafka_consumer](configs, + python_callable, + python_callable_wrapper, topic, partition, start_offset, @@ -32,7 +56,9 @@ cdef class KafkaDatasource(Datasource): delimiter) else: self.c_datasource = \ - make_unique[kafka_consumer](kafka_configs) + make_unique[kafka_consumer](configs, + python_callable, + python_callable_wrapper) cdef datasource* get_datasource(self) nogil: return self.c_datasource.get() diff --git a/python/custreamz/custreamz/kafka.py b/python/custreamz/custreamz/kafka.py index a301660a2e4..f5d5031602f 100644 --- a/python/custreamz/custreamz/kafka.py +++ b/python/custreamz/custreamz/kafka.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. import confluent_kafka as ck from cudf_kafka._lib.kafka import KafkaDatasource @@ -25,13 +25,7 @@ def __init__(self, kafka_configs): """ self.kafka_configs = kafka_configs - - self.kafka_confs = { - str.encode(key): str.encode(value) - for key, value in self.kafka_configs.items() - } - - self.kafka_meta_client = KafkaDatasource(self.kafka_confs) + self.kafka_meta_client = KafkaDatasource(kafka_configs) def list_topics(self, specific_topic=None): @@ -145,7 +139,7 @@ def read_gdf( ) kafka_datasource = KafkaDatasource( - self.kafka_confs, + self.kafka_configs, topic.encode(), partition, start, @@ -173,7 +167,10 @@ def read_gdf( kafka_datasource.close(batch_timeout) if result is not None: - return cudf.DataFrame._from_table(result) + if isinstance(result, cudf.DataFrame): + return result + else: + return cudf.DataFrame._from_data(result) else: # empty Dataframe return cudf.DataFrame() diff --git a/python/custreamz/custreamz/tests/test_kafka.py b/python/custreamz/custreamz/tests/test_kafka.py index d29ebf8db8b..ad3b829544b 100644 --- a/python/custreamz/custreamz/tests/test_kafka.py +++ b/python/custreamz/custreamz/tests/test_kafka.py @@ -5,11 +5,10 @@ from cudf.testing._utils import assert_eq -@pytest.mark.parametrize("commit_offset", [-1, 0, 1, 1000]) +@pytest.mark.parametrize("commit_offset", [1, 45, 100, 22, 1000, 10]) @pytest.mark.parametrize("topic", ["cudf-kafka-test-topic"]) def test_kafka_offset(kafka_client, topic, commit_offset): - ck_top = ck.TopicPartition(topic, 0, commit_offset) - offsets = [ck_top] + offsets = [ck.TopicPartition(topic, 0, commit_offset)] kafka_client.commit(offsets=offsets) # Get the offsets that were just committed to Kafka From 61199ea1196b4ac355c2746a43c6ffc007c44d52 Mon Sep 17 00:00:00 2001 From: Ashwin Srinath <3190405+shwina@users.noreply.github.com> Date: Thu, 6 Jan 2022 14:53:01 -0500 Subject: [PATCH 4/9] Fix groupby shift/diff/fill after selecting from a `GroupBy` (#9984) Fixes https://github.com/rapidsai/cudf/issues/9969 Due to a bug in `GroupBy.__getitem__`, selecting a column of a `GroupBy` and then doing a shift, diff, or fill operation would result in the operation being performed on the wrong values. This PR fixes `GroupBy.__getitem__` so we now have the right behaviour. Authors: - Ashwin Srinath (https://github.com/shwina) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) - Michael Wang (https://github.com/isVoid) URL: https://github.com/rapidsai/cudf/pull/9984 --- python/cudf/cudf/core/groupby/groupby.py | 4 +-- python/cudf/cudf/tests/test_groupby.py | 40 ++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index f1d622362e2..08ef3f07776 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -1190,7 +1190,7 @@ def shift(self, periods=1, freq=None, axis=0, fill_value=None): result = self.obj.__class__._from_data( *self._groupby.shift( - cudf.core.frame.Frame(value_columns), periods, fill_value + cudf.core.frame.Frame(value_columns._data), periods, fill_value ) ) result = self._mimic_pandas_order(result) @@ -1299,7 +1299,7 @@ class DataFrameGroupBy(GroupBy, GetAttrGetItemMixin): def __getitem__(self, key): return self.obj[key].groupby( - self.grouping, dropna=self._dropna, sort=self._sort + by=self.grouping.keys, dropna=self._dropna, sort=self._sort ) def nunique(self): diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index 1feaddf74e2..c73e96de470 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -2362,4 +2362,44 @@ def test_groupby_get_group(pdf, group, name, obj): assert_groupby_results_equal(expected, actual) +def test_groupby_select_then_ffill(): + pdf = pd.DataFrame( + { + "a": [1, 1, 1, 2, 2], + "b": [1, None, None, 2, None], + "c": [3, None, None, 4, None], + } + ) + gdf = cudf.from_pandas(pdf) + + expected = pdf.groupby("a")["c"].ffill() + actual = gdf.groupby("a")["c"].ffill() + + assert_groupby_results_equal(expected, actual) + + +def test_groupby_select_then_shift(): + pdf = pd.DataFrame( + {"a": [1, 1, 1, 2, 2], "b": [1, 2, 3, 4, 5], "c": [3, 4, 5, 6, 7]} + ) + gdf = cudf.from_pandas(pdf) + + expected = pdf.groupby("a")["c"].shift(1) + actual = gdf.groupby("a")["c"].shift(1) + + assert_groupby_results_equal(expected, actual) + + +def test_groupby_select_then_diff(): + pdf = pd.DataFrame( + {"a": [1, 1, 1, 2, 2], "b": [1, 2, 3, 4, 5], "c": [3, 4, 5, 6, 7]} + ) + gdf = cudf.from_pandas(pdf) + + expected = pdf.groupby("a")["c"].diff(1) + actual = gdf.groupby("a")["c"].diff(1) + + assert_groupby_results_equal(expected, actual) + + # TODO: Add a test including datetime64[ms] column in input data From 7392f9f5f10dc6efb4d21cfcef18a14a0df421c3 Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Thu, 6 Jan 2022 13:01:03 -0800 Subject: [PATCH 5/9] use ninja in java ci build (#9933) Authors: - Rong Ou (https://github.com/rongou) Approvers: - Robert (Bobby) Evans (https://github.com/revans2) - Jason Lowe (https://github.com/jlowe) - Gera Shegalov (https://github.com/gerashegalov) - Peixin (https://github.com/pxLi) URL: https://github.com/rapidsai/cudf/pull/9933 --- java/ci/Dockerfile.centos7 | 2 +- java/ci/build-in-docker.sh | 14 ++++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/java/ci/Dockerfile.centos7 b/java/ci/Dockerfile.centos7 index 2ee57bfaeab..c1d29468f65 100644 --- a/java/ci/Dockerfile.centos7 +++ b/java/ci/Dockerfile.centos7 @@ -28,7 +28,7 @@ FROM gpuci/cuda:$CUDA_VERSION-devel-centos7 ### Install basic requirements RUN yum install -y centos-release-scl RUN yum install -y devtoolset-9 epel-release -RUN yum install -y git zlib-devel maven tar wget patch +RUN yum install -y git zlib-devel maven tar wget patch ninja-build ## pre-create the CMAKE_INSTALL_PREFIX folder, set writable by any user for Jenkins RUN mkdir /usr/local/rapids && mkdir /rapids && chmod 777 /usr/local/rapids && chmod 777 /rapids diff --git a/java/ci/build-in-docker.sh b/java/ci/build-in-docker.sh index a99b6900830..ac8b2584091 100755 --- a/java/ci/build-in-docker.sh +++ b/java/ci/build-in-docker.sh @@ -19,7 +19,6 @@ set -ex gcc --version -PARALLEL_LEVEL=${PARALLEL_LEVEL:-4} SKIP_JAVA_TESTS=${SKIP_JAVA_TESTS:-true} BUILD_CPP_TESTS=${BUILD_CPP_TESTS:-OFF} ENABLE_CUDA_STATIC_RUNTIME=${ENABLE_CUDA_STATIC_RUNTIME:-ON} @@ -28,6 +27,7 @@ RMM_LOGGING_LEVEL=${RMM_LOGGING_LEVEL:-OFF} ENABLE_NVTX=${ENABLE_NVTX:-ON} ENABLE_GDS=${ENABLE_GDS:-OFF} OUT=${OUT:-out} +CMAKE_GENERATOR=${CMAKE_GENERATOR:-Ninja} SIGN_FILE=$1 #Set absolute path for OUT_PATH @@ -54,7 +54,9 @@ export LIBCUDF_KERNEL_CACHE_PATH=/rapids rm -rf "$WORKSPACE/cpp/build" mkdir -p "$WORKSPACE/cpp/build" cd "$WORKSPACE/cpp/build" -cmake .. -DUSE_NVTX=$ENABLE_NVTX \ +cmake .. -G"${CMAKE_GENERATOR}" \ + -DCMAKE_INSTALL_PREFIX=$INSTALL_PREFIX \ + -DUSE_NVTX=$ENABLE_NVTX \ -DCUDF_USE_ARROW_STATIC=ON \ -DCUDF_ENABLE_ARROW_S3=OFF \ -DBUILD_TESTS=$BUILD_CPP_TESTS \ @@ -62,8 +64,12 @@ cmake .. -DUSE_NVTX=$ENABLE_NVTX \ -DRMM_LOGGING_LEVEL=$RMM_LOGGING_LEVEL \ -DBUILD_SHARED_LIBS=OFF -make -j$PARALLEL_LEVEL -make install DESTDIR=$INSTALL_PREFIX +if [[ -z "${PARALLEL_LEVEL}" ]]; then + cmake --build . +else + cmake --build . --parallel $PARALLEL_LEVEL +fi +cmake --install . ###### Build cudf jar ###### BUILD_ARG="-Dmaven.repo.local=\"$WORKSPACE/.m2\"\ From 120aa62decc7a23919a9b669dbb49f63a698b47d Mon Sep 17 00:00:00 2001 From: nvdbaranec <56695930+nvdbaranec@users.noreply.github.com> Date: Thu, 6 Jan 2022 16:37:31 -0600 Subject: [PATCH 6/9] Fixed issue with percentile_approx where output tdigests could have uninitialized data at the end. (#9931) Fixes https://github.com/NVIDIA/spark-rapids/issues/4060 Issue was relatively straightforward. There is a section of code in the bucket generation step that detects "gaps" that would be generated during the reduction step. It was incorrectly indexing into the list of cumulative weights for input values. Fundamental change was to change the `TotalWeightIter` iterator which was just returning the total weight for an input group into a `GroupInfoFunc` functor that returns total weight as well as group size info that is used to index cumulative weights correctly. Authors: - https://github.com/nvdbaranec Approvers: - Vyas Ramasubramani (https://github.com/vyasr) - Jake Hemstad (https://github.com/jrhemstad) URL: https://github.com/rapidsai/cudf/pull/9931 --- cpp/src/groupby/sort/group_tdigest.cu | 154 +++++++++++++++----------- 1 file changed, 92 insertions(+), 62 deletions(-) diff --git a/cpp/src/groupby/sort/group_tdigest.cu b/cpp/src/groupby/sort/group_tdigest.cu index ecb18c09f9d..b7b45341ad2 100644 --- a/cpp/src/groupby/sort/group_tdigest.cu +++ b/cpp/src/groupby/sort/group_tdigest.cu @@ -101,10 +101,14 @@ struct merge_centroids { * nearest whole number <= it is floor(3.56) == 3. */ struct nearest_value_scalar_weights { - thrust::pair operator() __device__(double next_limit, size_type) + offset_type const* group_offsets; + + thrust::pair operator() __device__(double next_limit, size_type group_index) { - double const f = floor(next_limit); - return {f, max(0, static_cast(next_limit) - 1)}; + double const f = floor(next_limit); + auto const relative_weight_index = max(0, static_cast(next_limit) - 1); + auto const group_size = group_offsets[group_index + 1] - group_offsets[group_index]; + return {f, relative_weight_index < group_size ? relative_weight_index : group_size - 1}; } }; @@ -136,7 +140,8 @@ struct nearest_value_centroid_weights { group_cumulative_weights); return index == 0 ? thrust::pair{0, 0} - : thrust::pair{group_cumulative_weights[index - 1], index - 1}; + : thrust::pair{group_cumulative_weights[index - 1], + static_cast(index) - 1}; } }; @@ -187,6 +192,39 @@ struct cumulative_centroid_weight { } }; +// retrieve group info of scalar inputs by group index +struct scalar_group_info { + size_type const* group_valid_counts; + offset_type const* group_offsets; + + __device__ thrust::tuple operator()(size_type group_index) + { + return {static_cast(group_valid_counts[group_index]), + group_offsets[group_index + 1] - group_offsets[group_index], + group_offsets[group_index]}; + } +}; + +// retrieve group info of centroid inputs by group index +struct centroid_group_info { + double const* cumulative_weights; + offset_type const* outer_offsets; + offset_type const* inner_offsets; + + __device__ thrust::tuple operator()(size_type group_index) + { + // if there's no weights in this group of digests at all, return 0. + auto const group_start = inner_offsets[outer_offsets[group_index]]; + auto const group_end = inner_offsets[outer_offsets[group_index + 1]]; + auto const num_weights = group_end - group_start; + auto const last_weight_index = group_end - 1; + return num_weights == 0 + ? thrust::tuple{0, num_weights, group_start} + : thrust::tuple{ + cumulative_weights[last_weight_index], num_weights, group_start}; + } +}; + struct tdigest_min { __device__ double operator()(thrust::tuple const& t) { @@ -231,37 +269,40 @@ __device__ double scale_func_k1(double quantile, double delta_norm) * cluster sizes and total # of clusters, and once to compute the actual * weight limits per cluster. * - * @param delta_ tdigest compression level + * @param delta tdigest compression level * @param num_groups The number of input groups - * @param nearest_weight_ A functor which returns the nearest weight in the input + * @param nearest_weight A functor which returns the nearest weight in the input * stream that falls before our current cluster limit - * @param total_weight_ A functor which returns the expected total weight for - * the entire stream of input values for the specified group. + * @param group_info A functor which returns the info for the specified group (total + * weight, size and start offset) * @param group_cluster_wl Output. The set of cluster weight limits for each group. * @param group_num_clusters Output. The number of output clusters for each input group. * @param group_cluster_offsets Offsets per-group to the start of it's clusters * @param has_nulls Whether or not the input contains nulls * */ -template -__global__ void generate_cluster_limits_kernel(int delta_, + +template +__global__ void generate_cluster_limits_kernel(int delta, size_type num_groups, NearestWeightFunc nearest_weight, - TotalWeightIter total_weight_, + GroupInfo group_info, CumulativeWeight cumulative_weight, double* group_cluster_wl, size_type* group_num_clusters, offset_type const* group_cluster_offsets, bool has_nulls) { - int const tid = threadIdx.x + blockIdx.x * blockDim.x; + int const tid = threadIdx.x + blockIdx.x * blockDim.x; + auto const group_index = tid; if (group_index >= num_groups) { return; } // we will generate at most delta clusters. - double const delta = static_cast(delta_); - double const delta_norm = delta / (2.0 * M_PI); - double const total_weight = total_weight_[group_index]; + double const delta_norm = static_cast(delta) / (2.0 * M_PI); + double total_weight; + size_type group_size, group_start; + thrust::tie(total_weight, group_size, group_start) = group_info(group_index); // start at the correct place based on our cluster offset. double* cluster_wl = @@ -281,11 +322,11 @@ __global__ void generate_cluster_limits_kernel(int delta_, double cur_limit = 0.0; double cur_weight = 0.0; double next_limit = -1.0; - int last_inserted_index = -1; + int last_inserted_index = -1; // group-relative index into the input stream // compute the first cluster limit double nearest_w; - int nearest_w_index; + int nearest_w_index; // group-relative index into the input stream while (1) { cur_weight = next_limit < 0 ? 0 : max(cur_weight + 1, nearest_w); if (cur_weight >= total_weight) { break; } @@ -331,12 +372,19 @@ __global__ void generate_cluster_limits_kernel(int delta_, // during the reduction step to be trivial. // double adjusted_next_limit = next_limit; - if (nearest_w_index == last_inserted_index || last_inserted_index < 0) { - nearest_w_index = last_inserted_index + 1; - auto [r, i, adjusted] = cumulative_weight(nearest_w_index); - adjusted_next_limit = max(next_limit, adjusted); - (void)r; - (void)i; + if ((last_inserted_index < 0) || // if we haven't inserted anything yet + (nearest_w_index == + last_inserted_index)) { // if we land in the same bucket as the previous cap + + // force the value into this bucket + nearest_w_index = + (last_inserted_index == group_size - 1) ? last_inserted_index : last_inserted_index + 1; + + // the "adjusted" weight must be high enough so that this value will fall in the bucket. + // NOTE: cumulative_weight expects an absolute index into the input value stream, not a + // group-relative index + [[maybe_unused]] auto [r, i, adjusted] = cumulative_weight(nearest_w_index + group_start); + adjusted_next_limit = max(next_limit, adjusted); } cluster_wl[group_num_clusters[group_index]] = adjusted_next_limit; last_inserted_index = nearest_w_index; @@ -360,8 +408,8 @@ __global__ void generate_cluster_limits_kernel(int delta_, * @param num_groups The number of input groups * @param nearest_weight A functor which returns the nearest weight in the input * stream that falls before our current cluster limit - * @param total_weight A functor which returns the expected total weight for - * the entire stream of input values for the specified group. + * @param group_info A functor which returns the info for the specified group (total weight, + * size and start offset) * @param has_nulls Whether or not the input data contains nulls * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory @@ -369,12 +417,12 @@ __global__ void generate_cluster_limits_kernel(int delta_, * @returns A tuple containing the set of cluster weight limits for each group, a set of * list-style offsets indicating group sizes, and the total number of clusters */ -template +template std::tuple, std::unique_ptr, size_type> generate_group_cluster_info(int delta, size_type num_groups, NearestWeight nearest_weight, - TotalWeightIter total_weight, + GroupInfo group_info, CumulativeWeight cumulative_weight, bool has_nulls, rmm::cuda_stream_view stream, @@ -390,7 +438,7 @@ generate_group_cluster_info(int delta, delta, num_groups, nearest_weight, - total_weight, + group_info, cumulative_weight, nullptr, group_num_clusters.begin(), @@ -420,7 +468,7 @@ generate_group_cluster_info(int delta, delta, num_groups, nearest_weight, - total_weight, + group_info, cumulative_weight, group_cluster_wl.begin(), group_num_clusters.begin(), @@ -583,9 +631,8 @@ std::unique_ptr compute_tdigests(int delta, group_cluster_offsets = group_cluster_offsets->view().begin(), group_cumulative_weight] __device__(size_type value_index) -> size_type { // get group index, relative value index within the group and cumulative weight. - auto [group_index, relative_value_index, cumulative_weight] = + [[maybe_unused]] auto [group_index, relative_value_index, cumulative_weight] = group_cumulative_weight(value_index); - (void)relative_value_index; auto const num_clusters = group_cluster_offsets[group_index + 1] - group_cluster_offsets[group_index]; @@ -616,8 +663,9 @@ std::unique_ptr compute_tdigests(int delta, cudf::mutable_column_view weight_col(*centroid_weights); // reduce the centroids into the clusters - auto output = thrust::make_zip_iterator(thrust::make_tuple( + auto output = thrust::make_zip_iterator(thrust::make_tuple( mean_col.begin(), weight_col.begin(), thrust::make_discard_iterator())); + auto const num_values = std::distance(centroids_begin, centroids_end); thrust::reduce_by_key(rmm::exec_policy(stream), keys, @@ -640,12 +688,6 @@ std::unique_ptr compute_tdigests(int delta, mr); } -// retrieve total weight of scalar inputs by group index -struct scalar_total_weight { - size_type const* group_valid_counts; - __device__ double operator()(size_type group_index) { return group_valid_counts[group_index]; } -}; - // return the min/max value of scalar inputs by group index template struct get_scalar_minmax { @@ -678,17 +720,15 @@ struct typed_group_tdigest { rmm::mr::device_memory_resource* mr) { // first, generate cluster weight information for each input group - auto total_weight = cudf::detail::make_counting_transform_iterator( - 0, scalar_total_weight{group_valid_counts.begin()}); - auto [group_cluster_wl, group_cluster_offsets, total_clusters] = - generate_group_cluster_info(delta, - num_groups, - nearest_value_scalar_weights{}, - total_weight, - cumulative_scalar_weight{group_offsets, group_labels}, - col.null_count() > 0, - stream, - mr); + auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info( + delta, + num_groups, + nearest_value_scalar_weights{group_offsets.begin()}, + scalar_group_info{group_valid_counts.begin(), group_offsets.begin()}, + cumulative_scalar_weight{group_offsets, group_labels}, + col.null_count() > 0, + stream, + mr); // device column view. handy because the .element() function // automatically handles fixed-point conversions for us @@ -927,25 +967,15 @@ std::unique_ptr group_merge_tdigest(column_view const& input, auto const delta = max_centroids; // generate cluster info - auto total_group_weight = cudf::detail::make_counting_transform_iterator( - 0, - [outer_offsets = group_offsets.data(), - inner_offsets = tdigest_offsets.begin(), - cumulative_weights = - cumulative_weights->view().begin()] __device__(size_type group_index) -> double { - // if there's no weights in this group of digests at all, return 0. - auto const num_weights = - inner_offsets[outer_offsets[group_index + 1]] - inner_offsets[outer_offsets[group_index]]; - auto const last_weight_index = inner_offsets[outer_offsets[group_index + 1]] - 1; - return num_weights == 0 ? 0 : cumulative_weights[last_weight_index]; - }); auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info( delta, num_groups, nearest_value_centroid_weights{cumulative_weights->view().begin(), group_offsets.data(), tdigest_offsets.begin()}, - total_group_weight, + centroid_group_info{cumulative_weights->view().begin(), + group_offsets.data(), + tdigest_offsets.begin()}, cumulative_centroid_weight{ cumulative_weights->view().begin(), group_labels, From de8c0b8ee90629d1880953413de4b47907627958 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 6 Jan 2022 20:07:23 -0800 Subject: [PATCH 7/9] Resolve racecheck errors in ORC kernels (#9916) Running ORC Python tests with `compute-sanitizer --tool racecheck` results in a number of errors/warnings. This PR resolves the errors originating in ORC kernels. Remaining errors come from `gpu_inflate`. Adds a few missing block/warp syncs and minor clean up in the affected code. Causes ~4~2% slowdown on average in ORC reader benchmarks. Not negligible, will double check whether the changes are required, or just resolving false positives in `racecheck`. Ran the benchmarks many more times, and the average time difference is smaller than variations between runs. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Elias Stehle (https://github.com/elstehle) - Devavret Makkar (https://github.com/devavret) URL: https://github.com/rapidsai/cudf/pull/9916 --- cpp/src/io/comp/gpuinflate.cu | 17 +++++++---------- cpp/src/io/orc/stripe_data.cu | 35 ++++++++++++++++++----------------- cpp/src/io/orc/stripe_enc.cu | 7 ++----- 3 files changed, 27 insertions(+), 32 deletions(-) diff --git a/cpp/src/io/comp/gpuinflate.cu b/cpp/src/io/comp/gpuinflate.cu index 338af72e4c9..dab8ce1afa5 100644 --- a/cpp/src/io/comp/gpuinflate.cu +++ b/cpp/src/io/comp/gpuinflate.cu @@ -780,22 +780,19 @@ __device__ void process_symbols(inflate_state_s* s, int t) do { volatile uint32_t* b = &s->x.u.symqueue[batch * batch_size]; - int batch_len, pos; - int32_t symt; - uint32_t lit_mask; - + int batch_len = 0; if (t == 0) { while ((batch_len = s->x.batch_len[batch]) == 0) {} - } else { - batch_len = 0; } batch_len = shuffle(batch_len); if (batch_len < 0) { break; } - symt = (t < batch_len) ? b[t] : 256; - lit_mask = ballot(symt >= 256); - pos = min((__ffs(lit_mask) - 1) & 0xff, 32); + auto const symt = (t < batch_len) ? b[t] : 256; + auto const lit_mask = ballot(symt >= 256); + auto pos = min((__ffs(lit_mask) - 1) & 0xff, 32); + if (t == 0) { s->x.batch_len[batch] = 0; } + if (t < pos && out + t < outend) { out[t] = symt; } out += pos; batch_len -= pos; @@ -825,7 +822,7 @@ __device__ void process_symbols(inflate_state_s* s, int t) } } batch = (batch + 1) & (batch_count - 1); - } while (1); + } while (true); if (t == 0) { s->out = out; } } diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 8f8bb87d9e4..05bc25597c2 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -409,7 +409,7 @@ inline __device__ int decode_base128_varint(volatile orc_bytestream_s* bs, int p if (b > 0x7f) { b = bytestream_readbyte(bs, pos++); v = (v & 0x0fffffff) | (b << 28); - if (sizeof(T) > 4) { + if constexpr (sizeof(T) > 4) { uint32_t lo = v; uint64_t hi; v = b >> 4; @@ -650,13 +650,11 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, int t, bool has_buffered_values = false) { - uint32_t numvals, numruns; - int r, tr; - if (t == 0) { uint32_t maxpos = min(bs->len, bs->pos + (bytestream_buffer_size - 8u)); uint32_t lastpos = bs->pos; - numvals = numruns = 0; + auto numvals = 0; + auto numruns = 0; // Find the length and start location of each run while (numvals < maxvals) { uint32_t pos = lastpos; @@ -713,9 +711,9 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, } __syncthreads(); // Process the runs, 1 warp per run - numruns = rle->num_runs; - r = t >> 5; - tr = t & 0x1f; + auto const numruns = rle->num_runs; + auto const r = t >> 5; + auto const tr = t & 0x1f; for (uint32_t run = r; run < numruns; run += num_warps) { uint32_t base, pos, w, n; int mode; @@ -731,7 +729,7 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, w = 8 + (byte0 & 0x38); // 8 to 64 bits n = 3 + (byte0 & 7); // 3 to 10 values bytestream_readbe(bs, pos * 8, w, baseval); - if (sizeof(T) <= 4) { + if constexpr (sizeof(T) <= 4) { rle->baseval.u32[r] = baseval; } else { rle->baseval.u64[r] = baseval; @@ -746,7 +744,7 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, uint32_t byte3 = bytestream_readbyte(bs, pos++); uint32_t bw = 1 + (byte2 >> 5); // base value width, 1 to 8 bytes uint32_t pw = kRLEv2_W[byte2 & 0x1f]; // patch width, 1 to 64 bits - if (sizeof(T) <= 4) { + if constexpr (sizeof(T) <= 4) { uint32_t baseval, mask; bytestream_readbe(bs, pos * 8, bw * 8, baseval); mask = (1 << (bw * 8 - 1)) - 1; @@ -766,7 +764,7 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, int64_t delta; // Delta pos = decode_varint(bs, pos, baseval); - if (sizeof(T) <= 4) { + if constexpr (sizeof(T) <= 4) { rle->baseval.u32[r] = baseval; } else { rle->baseval.u64[r] = baseval; @@ -782,8 +780,9 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, pos = shuffle(pos); n = shuffle(n); w = shuffle(w); + __syncwarp(); // Not required, included to fix the racecheck warning for (uint32_t i = tr; i < n; i += 32) { - if (sizeof(T) <= 4) { + if constexpr (sizeof(T) <= 4) { if (mode == 0) { vals[base + i] = rle->baseval.u32[r]; } else if (mode == 1) { @@ -860,7 +859,7 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, if (j & i) vals[base + j] += vals[base + ((j & ~i) | (i - 1))]; } } - if (sizeof(T) <= 4) + if constexpr (sizeof(T) <= 4) baseval = rle->baseval.u32[r]; else baseval = rle->baseval.u64[r]; @@ -868,6 +867,7 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, vals[base + j] += baseval; } } + __syncwarp(); } __syncthreads(); return rle->num_vals; @@ -1679,11 +1679,12 @@ __global__ void __launch_bounds__(block_size) } } } - if (t == 0 && numvals + vals_skipped > 0 && numvals < s->top.data.max_vals) { - if (s->chunk.type_kind == TIMESTAMP) { - s->top.data.buffered_count = s->top.data.max_vals - numvals; + if (t == 0 && numvals + vals_skipped > 0) { + auto const max_vals = s->top.data.max_vals; + if (max_vals > numvals) { + if (s->chunk.type_kind == TIMESTAMP) { s->top.data.buffered_count = max_vals - numvals; } + s->top.data.max_vals = numvals; } - s->top.data.max_vals = numvals; } __syncthreads(); // Use the valid bits to compute non-null row positions until we get a full batch of values to diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 829e4877c44..660ec025d00 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -414,7 +414,7 @@ static __device__ uint32_t IntegerRLE( uint32_t mode1_w, mode2_w; typename std::make_unsigned::type vrange_mode1, vrange_mode2; block_vmin = static_cast(vmin); - if (sizeof(T) > 4) { + if constexpr (sizeof(T) > 4) { vrange_mode1 = (is_signed) ? max(zigzag(vmin), zigzag(vmax)) : vmax; vrange_mode2 = vmax - vmin; mode1_w = 8 - min(CountLeadingBytes64(vrange_mode1), 7); @@ -705,10 +705,7 @@ static __device__ void encode_null_mask(orcenc_state_s* s, } // reset shared state - if (t == 0) { - s->nnz = 0; - s->numvals = 0; - } + if (t == 0) { s->nnz = 0; } } /** From 42a0e55b34cfe8b08872cd05bc3e8a60faa266fb Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Fri, 7 Jan 2022 13:20:24 -0800 Subject: [PATCH 8/9] Clean up CUDA stream use in cuIO (#9991) Removes defaults for internal stream parameters. Fixes cases where default stream was used instead of propagating the stream. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - David Wendt (https://github.com/davidwendt) - Robert Maynard (https://github.com/robertmaynard) URL: https://github.com/rapidsai/cudf/pull/9991 --- cpp/src/io/avro/avro_gpu.h | 12 +++--- cpp/src/io/avro/reader_impl.cu | 8 ++-- cpp/src/io/comp/gpuinflate.h | 46 +++++++++++----------- cpp/src/io/csv/reader_impl.cu | 4 +- cpp/src/io/csv/writer_impl.cu | 9 ++--- cpp/src/io/orc/timezone.cuh | 7 +++- cpp/src/io/orc/writer_impl.hpp | 4 +- cpp/src/io/parquet/parquet_gpu.hpp | 4 +- cpp/src/io/parquet/reader_impl.cu | 6 +-- cpp/src/io/parquet/writer_impl.hpp | 4 +- cpp/src/io/utilities/column_buffer.hpp | 39 ++++++++---------- cpp/src/io/utilities/hostdevice_vector.hpp | 13 ++---- cpp/tests/io/comp/decomp_test.cpp | 9 +++-- cpp/tests/utilities_tests/span_tests.cu | 16 ++++---- 14 files changed, 87 insertions(+), 94 deletions(-) diff --git a/cpp/src/io/avro/avro_gpu.h b/cpp/src/io/avro/avro_gpu.h index c87ac8afb13..3811132435b 100644 --- a/cpp/src/io/avro/avro_gpu.h +++ b/cpp/src/io/avro/avro_gpu.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,17 +47,17 @@ struct schemadesc_s { * @param[in] max_rows Maximum number of rows to load * @param[in] first_row Crop all rows below first_row * @param[in] min_row_size Minimum size in bytes of a row - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use */ void DecodeAvroColumnData(cudf::device_span blocks, schemadesc_s* schema, cudf::device_span global_dictionary, uint8_t const* avro_data, uint32_t schema_len, - size_t max_rows = ~0, - size_t first_row = 0, - uint32_t min_row_size = 0, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); + size_t max_rows, + size_t first_row, + uint32_t min_row_size, + rmm::cuda_stream_view stream); } // namespace gpu } // namespace avro diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index d908e6c8ed5..0fa5680c5d2 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -159,8 +159,8 @@ rmm::device_buffer decompress_data(datasource& source, if (meta.codec == "deflate") { size_t uncompressed_data_size = 0; - auto inflate_in = hostdevice_vector(meta.block_list.size()); - auto inflate_out = hostdevice_vector(meta.block_list.size()); + auto inflate_in = hostdevice_vector(meta.block_list.size(), stream); + auto inflate_out = hostdevice_vector(meta.block_list.size(), stream); // Guess an initial maximum uncompressed block size uint32_t initial_blk_len = (meta.max_block_size * 2 + 0xfff) & ~0xfff; @@ -343,7 +343,7 @@ std::vector decode_data(metadata& meta, } // Build gpu schema - auto schema_desc = hostdevice_vector(meta.schema.size()); + auto schema_desc = hostdevice_vector(meta.schema.size(), stream); uint32_t min_row_data_size = 0; int skip_field_cnt = 0; diff --git a/cpp/src/io/comp/gpuinflate.h b/cpp/src/io/comp/gpuinflate.h index a37d282997e..3ca9c9eee10 100644 --- a/cpp/src/io/comp/gpuinflate.h +++ b/cpp/src/io/comp/gpuinflate.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2020, NVIDIA CORPORATION. + * Copyright (c) 2018-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,26 +49,26 @@ struct gpu_inflate_status_s { * * @param[in] inputs List of input argument structures * @param[out] outputs List of output status structures - * @param[in] count Number of input/output structures, default 1 - * @param[in] parse_hdr Whether or not to parse GZIP header, default false - * @param[in] stream CUDA stream to use, default 0 + * @param[in] count Number of input/output structures + * @param[in] parse_hdr Whether or not to parse GZIP header + * @param[in] stream CUDA stream to use */ cudaError_t gpuinflate(gpu_inflate_input_s* inputs, gpu_inflate_status_s* outputs, - int count = 1, - int parse_hdr = 0, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); + int count, + int parse_hdr, + rmm::cuda_stream_view stream); /** * @brief Interface for copying uncompressed byte blocks * * @param[in] inputs List of input argument structures - * @param[in] count Number of input structures, default 1 - * @param[in] stream CUDA stream to use, default 0 + * @param[in] count Number of input structures + * @param[in] stream CUDA stream to use */ cudaError_t gpu_copy_uncompressed_blocks(gpu_inflate_input_s* inputs, - int count = 1, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); + int count, + rmm::cuda_stream_view stream); /** * @brief Interface for decompressing Snappy-compressed data @@ -78,13 +78,13 @@ cudaError_t gpu_copy_uncompressed_blocks(gpu_inflate_input_s* inputs, * * @param[in] inputs List of input argument structures * @param[out] outputs List of output status structures - * @param[in] count Number of input/output structures, default 1 - * @param[in] stream CUDA stream to use, default 0 + * @param[in] count Number of input/output structures + * @param[in] stream CUDA stream to use */ cudaError_t gpu_unsnap(gpu_inflate_input_s* inputs, gpu_inflate_status_s* outputs, - int count = 1, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); + int count, + rmm::cuda_stream_view stream); /** * @brief Computes the size of temporary memory for Brotli decompression @@ -105,15 +105,15 @@ size_t get_gpu_debrotli_scratch_size(int max_num_inputs = 0); * @param[out] outputs List of output status structures * @param[in] scratch Temporary memory for intermediate work * @param[in] scratch_size Size in bytes of the temporary memory - * @param[in] count Number of input/output structures, default 1 - * @param[in] stream CUDA stream to use, default 0 + * @param[in] count Number of input/output structures + * @param[in] stream CUDA stream to use */ cudaError_t gpu_debrotli(gpu_inflate_input_s* inputs, gpu_inflate_status_s* outputs, void* scratch, size_t scratch_size, - int count = 1, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); + int count, + rmm::cuda_stream_view stream); /** * @brief Interface for compressing data with Snappy @@ -123,13 +123,13 @@ cudaError_t gpu_debrotli(gpu_inflate_input_s* inputs, * * @param[in] inputs List of input argument structures * @param[out] outputs List of output status structures - * @param[in] count Number of input/output structures, default 1 - * @param[in] stream CUDA stream to use, default 0 + * @param[in] count Number of input/output structures + * @param[in] stream CUDA stream to use */ cudaError_t gpu_snap(gpu_inflate_input_s* inputs, gpu_inflate_status_s* outputs, - int count = 1, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); + int count, + rmm::cuda_stream_view stream); } // namespace io } // namespace cudf diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index 7f032b6987c..0e50bb46232 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -237,7 +237,7 @@ std::pair, selected_rows_offsets> load_data_and_gather size_t buffer_size = std::min(max_chunk_bytes, data.size()); size_t max_blocks = std::max((buffer_size / cudf::io::csv::gpu::rowofs_block_bytes) + 1, 2); - hostdevice_vector row_ctx(max_blocks); + hostdevice_vector row_ctx(max_blocks, stream); size_t buffer_pos = std::min(range_begin - std::min(range_begin, sizeof(char)), data.size()); size_t pos = std::min(range_begin, data.size()); size_t header_rows = (reader_opts.get_header() >= 0) ? reader_opts.get_header() + 1 : 0; diff --git a/cpp/src/io/csv/writer_impl.cu b/cpp/src/io/csv/writer_impl.cu index b9b6fc6cf94..1b66df860a3 100644 --- a/cpp/src/io/csv/writer_impl.cu +++ b/cpp/src/io/csv/writer_impl.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -137,10 +137,9 @@ struct column_to_strings_fn { (cudf::is_timestamp()) || (cudf::is_duration())); } - explicit column_to_strings_fn( - csv_writer_options const& options, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) + explicit column_to_strings_fn(csv_writer_options const& options, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) : options_(options), stream_(stream), mr_(mr) { } diff --git a/cpp/src/io/orc/timezone.cuh b/cpp/src/io/orc/timezone.cuh index 77c2bd4ffa0..e15144f9ea5 100644 --- a/cpp/src/io/orc/timezone.cuh +++ b/cpp/src/io/orc/timezone.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -107,10 +107,13 @@ inline __device__ int32_t get_gmt_offset(cudf::device_span ttimes return get_gmt_offset_impl(ttimes.begin(), offsets.begin(), ttimes.size(), ts); } -struct timezone_table { +class timezone_table { int32_t gmt_offset = 0; rmm::device_uvector ttimes; rmm::device_uvector offsets; + + public: + // Safe to use the default stream, device_uvectors will not change after they are created empty timezone_table() : ttimes{0, rmm::cuda_stream_default}, offsets{0, rmm::cuda_stream_default} {} timezone_table(int32_t gmt_offset, rmm::device_uvector&& ttimes, diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index 80c22b09927..d989721334e 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -349,7 +349,7 @@ class writer::impl { private: rmm::mr::device_memory_resource* _mr = nullptr; // Cuda stream to be used - rmm::cuda_stream_view stream = rmm::cuda_stream_default; + rmm::cuda_stream_view stream; stripe_size_limits max_stripe_size; size_type row_index_stride; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 53bb11c8b70..b77eeac68f5 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2021, NVIDIA CORPORATION. + * Copyright (c) 2018-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -479,7 +479,7 @@ struct dremel_data { dremel_data get_dremel_data(column_view h_col, rmm::device_uvector const& d_nullability, std::vector const& nullability, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); + rmm::cuda_stream_view stream); /** * @brief Launches kernel for initializing encoder page fragments diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 957cc85454c..fc4afe951db 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1427,8 +1427,8 @@ void reader::impl::decode_page_data(hostdevice_vector& chu // In order to reduce the number of allocations of hostdevice_vector, we allocate a single vector // to store all per-chunk pointers to nested data/nullmask. `chunk_offsets[i]` will store the // offset into `chunk_nested_data`/`chunk_nested_valids` for the array of pointers for chunk `i` - auto chunk_nested_valids = hostdevice_vector(sum_max_depths); - auto chunk_nested_data = hostdevice_vector(sum_max_depths); + auto chunk_nested_valids = hostdevice_vector(sum_max_depths, stream); + auto chunk_nested_data = hostdevice_vector(sum_max_depths, stream); auto chunk_offsets = std::vector(); // Update chunks with pointers to column data. diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index e41832aaabe..405ab0c2880 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -206,7 +206,7 @@ class writer::impl { // TODO : figure out if we want to keep this. It is currently unused. rmm::mr::device_memory_resource* _mr = nullptr; // Cuda stream to be used - rmm::cuda_stream_view stream = rmm::cuda_stream_default; + rmm::cuda_stream_view stream; size_t max_row_group_size = default_row_group_size_bytes; size_type max_row_group_rows = default_row_group_size_rows; diff --git a/cpp/src/io/utilities/column_buffer.hpp b/cpp/src/io/utilities/column_buffer.hpp index 9300bd0f8b2..17df49009c2 100644 --- a/cpp/src/io/utilities/column_buffer.hpp +++ b/cpp/src/io/utilities/column_buffer.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,11 +45,10 @@ namespace detail { * * @return `rmm::device_buffer` Device buffer allocation */ -inline rmm::device_buffer create_data( - data_type type, - size_type size, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) +inline rmm::device_buffer create_data(data_type type, + size_type size, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { std::size_t data_size = size_of(type) * size; @@ -75,9 +74,9 @@ struct column_buffer { // construct with a known size. allocates memory column_buffer(data_type _type, size_type _size, - bool _is_nullable = true, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) + bool _is_nullable, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) : type(_type), is_nullable(_is_nullable) { create(_size, stream, mr); @@ -93,9 +92,7 @@ struct column_buffer { // instantiate a column of known type with a specified size. Allows deferred creation for // preprocessing steps such as in the Parquet reader - void create(size_type _size, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + void create(size_type _size, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); auto data() { return _strings ? _strings->data() : _data.data(); } auto data_size() const { return _strings ? _strings->size() : _data.size(); } @@ -134,11 +131,10 @@ struct column_buffer { * * @return `std::unique_ptr` Column from the existing device data */ -std::unique_ptr make_column( - column_buffer& buffer, - column_name_info* schema_info = nullptr, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +std::unique_ptr make_column(column_buffer& buffer, + column_name_info* schema_info, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); /** * @brief Creates an equivalent empty column from an existing set of device memory buffers. @@ -155,11 +151,10 @@ std::unique_ptr make_column( * * @return `std::unique_ptr` Column from the existing device data */ -std::unique_ptr empty_like( - column_buffer& buffer, - column_name_info* schema_info = nullptr, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +std::unique_ptr empty_like(column_buffer& buffer, + column_name_info* schema_info, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); } // namespace detail } // namespace io diff --git a/cpp/src/io/utilities/hostdevice_vector.hpp b/cpp/src/io/utilities/hostdevice_vector.hpp index a7f9aec7bb4..cbf914b8da6 100644 --- a/cpp/src/io/utilities/hostdevice_vector.hpp +++ b/cpp/src/io/utilities/hostdevice_vector.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,15 +45,12 @@ class hostdevice_vector { return *this; } - explicit hostdevice_vector(size_t max_size, - rmm::cuda_stream_view stream = rmm::cuda_stream_default) + explicit hostdevice_vector(size_t max_size, rmm::cuda_stream_view stream) : hostdevice_vector(max_size, max_size, stream) { } - explicit hostdevice_vector(size_t initial_size, - size_t max_size, - rmm::cuda_stream_view stream = rmm::cuda_stream_default) + explicit hostdevice_vector(size_t initial_size, size_t max_size, rmm::cuda_stream_view stream) : num_elements(initial_size), max_elements(max_size) { if (max_elements != 0) { @@ -148,9 +145,7 @@ namespace detail { template class hostdevice_2dvector { public: - hostdevice_2dvector(size_t rows, - size_t columns, - rmm::cuda_stream_view stream = rmm::cuda_stream_default) + hostdevice_2dvector(size_t rows, size_t columns, rmm::cuda_stream_view stream) : _size{rows, columns}, _data{rows * columns, stream} { } diff --git a/cpp/tests/io/comp/decomp_test.cpp b/cpp/tests/io/comp/decomp_test.cpp index 8247ced4629..dd00b201df9 100644 --- a/cpp/tests/io/comp/decomp_test.cpp +++ b/cpp/tests/io/comp/decomp_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -97,7 +97,7 @@ struct GzipDecompressTest : public DecompressTest { cudaError_t dispatch(cudf::io::gpu_inflate_input_s* d_inf_args, cudf::io::gpu_inflate_status_s* d_inf_stat) { - return cudf::io::gpuinflate(d_inf_args, d_inf_stat, 1, 1); + return cudf::io::gpuinflate(d_inf_args, d_inf_stat, 1, 1, rmm::cuda_stream_default); } }; @@ -108,7 +108,7 @@ struct SnappyDecompressTest : public DecompressTest { cudaError_t dispatch(cudf::io::gpu_inflate_input_s* d_inf_args, cudf::io::gpu_inflate_status_s* d_inf_stat) { - return cudf::io::gpu_unsnap(d_inf_args, d_inf_stat, 1); + return cudf::io::gpu_unsnap(d_inf_args, d_inf_stat, 1, rmm::cuda_stream_default); } }; @@ -122,7 +122,8 @@ struct BrotliDecompressTest : public DecompressTest { rmm::device_buffer d_scratch{cudf::io::get_gpu_debrotli_scratch_size(1), rmm::cuda_stream_default}; - return cudf::io::gpu_debrotli(d_inf_args, d_inf_stat, d_scratch.data(), d_scratch.size(), 1); + return cudf::io::gpu_debrotli( + d_inf_args, d_inf_stat, d_scratch.data(), d_scratch.size(), 1, rmm::cuda_stream_default); } }; diff --git a/cpp/tests/utilities_tests/span_tests.cu b/cpp/tests/utilities_tests/span_tests.cu index a9a5151e7c3..044ac3e60f7 100644 --- a/cpp/tests/utilities_tests/span_tests.cu +++ b/cpp/tests/utilities_tests/span_tests.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -248,9 +248,9 @@ class MdSpanTest : public cudf::test::BaseFixture { TEST(MdSpanTest, CanDetermineEmptiness) { - auto const vector = hostdevice_2dvector(1, 2); - auto const no_rows_vector = hostdevice_2dvector(0, 2); - auto const no_columns_vector = hostdevice_2dvector(1, 0); + auto const vector = hostdevice_2dvector(1, 2, rmm::cuda_stream_default); + auto const no_rows_vector = hostdevice_2dvector(0, 2, rmm::cuda_stream_default); + auto const no_columns_vector = hostdevice_2dvector(1, 0, rmm::cuda_stream_default); EXPECT_FALSE(host_2dspan{vector}.is_empty()); EXPECT_FALSE(device_2dspan{vector}.is_empty()); @@ -271,7 +271,7 @@ __global__ void readwrite_kernel(device_2dspan result) TEST(MdSpanTest, DeviceReadWrite) { - auto vector = hostdevice_2dvector(11, 23); + auto vector = hostdevice_2dvector(11, 23, rmm::cuda_stream_default); readwrite_kernel<<<1, 1>>>(vector); readwrite_kernel<<<1, 1>>>(vector); @@ -281,7 +281,7 @@ TEST(MdSpanTest, DeviceReadWrite) TEST(MdSpanTest, HostReadWrite) { - auto vector = hostdevice_2dvector(11, 23); + auto vector = hostdevice_2dvector(11, 23, rmm::cuda_stream_default); auto span = host_2dspan{vector}; span[5][6] = 5; if (span[5][6] == 5) { span[5][6] *= 6; } @@ -291,7 +291,7 @@ TEST(MdSpanTest, HostReadWrite) TEST(MdSpanTest, CanGetSize) { - auto const vector = hostdevice_2dvector(1, 2); + auto const vector = hostdevice_2dvector(1, 2, rmm::cuda_stream_default); EXPECT_EQ(host_2dspan{vector}.size(), vector.size()); EXPECT_EQ(device_2dspan{vector}.size(), vector.size()); @@ -299,7 +299,7 @@ TEST(MdSpanTest, CanGetSize) TEST(MdSpanTest, CanGetCount) { - auto const vector = hostdevice_2dvector(11, 23); + auto const vector = hostdevice_2dvector(11, 23, rmm::cuda_stream_default); EXPECT_EQ(host_2dspan{vector}.count(), 11ul * 23); EXPECT_EQ(device_2dspan{vector}.count(), 11ul * 23); From 7656277658a7c1bc90f144cc8e435f90bb17cac5 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Sat, 8 Jan 2022 03:59:47 +0530 Subject: [PATCH 9/9] Use default value for decimal precision in parquet writer when not specified (#9963) Fixes #9962 Authors: - Devavret Makkar (https://github.com/devavret) Approvers: - Mike Wilson (https://github.com/hyperbolic2346) - Nghia Truong (https://github.com/ttnghia) URL: https://github.com/rapidsai/cudf/pull/9963 --- .../io/parquet/parquet_reader_benchmark.cpp | 20 ++++++++------ cpp/src/io/parquet/writer_impl.cu | 27 ++++++++++--------- cpp/tests/io/parquet_test.cpp | 3 --- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/cpp/benchmarks/io/parquet/parquet_reader_benchmark.cpp b/cpp/benchmarks/io/parquet/parquet_reader_benchmark.cpp index a68ce2bd1a1..888102c03be 100644 --- a/cpp/benchmarks/io/parquet/parquet_reader_benchmark.cpp +++ b/cpp/benchmarks/io/parquet/parquet_reader_benchmark.cpp @@ -89,14 +89,14 @@ void BM_parq_read_varying_options(benchmark::State& state) auto const use_pandas_metadata = (flags & 2) != 0; auto const ts_type = cudf::data_type{static_cast(state.range(state_idx++))}; - auto const data_types = - dtypes_for_column_selection(get_type_or_group({int32_t(type_group_id::INTEGRAL), - int32_t(type_group_id::FLOATING_POINT), - int32_t(type_group_id::FIXED_POINT), - int32_t(type_group_id::TIMESTAMP), - int32_t(cudf::type_id::STRING), - int32_t(cudf::type_id::LIST)}), - col_sel); + auto const data_types = dtypes_for_column_selection( + get_type_or_group({static_cast(type_group_id::INTEGRAL), + static_cast(type_group_id::FLOATING_POINT), + static_cast(type_group_id::FIXED_POINT), + static_cast(type_group_id::TIMESTAMP), + static_cast(cudf::type_id::STRING), + static_cast(cudf::type_id::LIST)}), + col_sel); auto const tbl = create_random_table(data_types, data_types.size(), table_size_bytes{data_size}); auto const view = tbl->view(); @@ -181,6 +181,9 @@ BENCHMARK_REGISTER_F(ParquetRead, column_selection) ->Unit(benchmark::kMillisecond) ->UseManualTime(); +// Disabled until we add an API to read metadata from a parquet file and determine num row groups. +// https://github.com/rapidsai/cudf/pull/9963#issuecomment-1004832863 +/* BENCHMARK_DEFINE_F(ParquetRead, row_selection) (::benchmark::State& state) { BM_parq_read_varying_options(state); } BENCHMARK_REGISTER_F(ParquetRead, row_selection) @@ -191,6 +194,7 @@ BENCHMARK_REGISTER_F(ParquetRead, row_selection) {int32_t(cudf::type_id::EMPTY)}}) ->Unit(benchmark::kMillisecond) ->UseManualTime(); +*/ BENCHMARK_DEFINE_F(ParquetRead, misc_options) (::benchmark::State& state) { BM_parq_read_varying_options(state); } diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index c1b67cbda07..b302516ba39 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -447,25 +447,28 @@ struct leaf_schema_fn { std::enable_if_t(), void> operator()() { if (std::is_same_v) { - col_schema.type = Type::INT32; - col_schema.stats_dtype = statistics_dtype::dtype_int32; + col_schema.type = Type::INT32; + col_schema.stats_dtype = statistics_dtype::dtype_int32; + col_schema.decimal_precision = 9; } else if (std::is_same_v) { - col_schema.type = Type::INT64; - col_schema.stats_dtype = statistics_dtype::dtype_decimal64; + col_schema.type = Type::INT64; + col_schema.stats_dtype = statistics_dtype::dtype_decimal64; + col_schema.decimal_precision = 18; } else if (std::is_same_v) { - col_schema.type = Type::FIXED_LEN_BYTE_ARRAY; - col_schema.type_length = sizeof(__int128_t); - col_schema.stats_dtype = statistics_dtype::dtype_decimal128; + col_schema.type = Type::FIXED_LEN_BYTE_ARRAY; + col_schema.type_length = sizeof(__int128_t); + col_schema.stats_dtype = statistics_dtype::dtype_decimal128; + col_schema.decimal_precision = 38; } else { CUDF_FAIL("Unsupported fixed point type for parquet writer"); } col_schema.converted_type = ConvertedType::DECIMAL; col_schema.decimal_scale = -col->type().scale(); // parquet and cudf disagree about scale signs - CUDF_EXPECTS(col_meta.is_decimal_precision_set(), - "Precision must be specified for decimal columns"); - CUDF_EXPECTS(col_meta.get_decimal_precision() >= col_schema.decimal_scale, - "Precision must be equal to or greater than scale!"); - col_schema.decimal_precision = col_meta.get_decimal_precision(); + if (col_meta.is_decimal_precision_set()) { + CUDF_EXPECTS(col_meta.get_decimal_precision() >= col_schema.decimal_scale, + "Precision must be equal to or greater than scale!"); + col_schema.decimal_precision = col_meta.get_decimal_precision(); + } } template diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 75ff39cbe70..9c656abb666 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -2021,9 +2021,6 @@ TEST_F(ParquetWriterTest, DecimalWrite) cudf_io::parquet_writer_options args = cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, table); - // verify failure if no decimal precision given - EXPECT_THROW(cudf_io::write_parquet(args), cudf::logic_error); - cudf_io::table_input_metadata expected_metadata(table); // verify failure if too small a precision is given