From ee8cd59e297ff3d6e462247b35c46e39f761db68 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Tue, 10 May 2022 13:13:46 -0400 Subject: [PATCH 1/6] Improve coverage of dask-cudf's groupby aggregation, add tests for `dropna` support (#10449) This PR does the following: - Make sure that all of dask-cudf's `SUPPORTED_AGGS` have an overriding method for upstream Dask's series / dataframe groupby methods - Add tests comparing dask-cudf's `dropna` support to upstream Dask's, as at the moment we are only comparing against cuDF - Fix the resulting failures of these changes (by properly parsing `self.dropna` in dask-cudf's groupby code) As a side note, I think that a larger rethinking of dask-cudf's groupby would pay off well, as currently it seems like we have some "duplicate" tests and aren't really able to discern if `groupby_agg` was called for a supported aggregation Authors: - Charles Blackmon-Luca (https://github.com/charlesbluca) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) - Richard (Rick) Zamora (https://github.com/rjzamora) URL: https://github.com/rapidsai/cudf/pull/10449 --- python/dask_cudf/dask_cudf/groupby.py | 281 ++++++++++++++++-- .../dask_cudf/dask_cudf/tests/test_groupby.py | 101 ++++++- 2 files changed, 352 insertions(+), 30 deletions(-) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index 684b1f71099..d137fac5fe3 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -35,6 +35,25 @@ ) +def _check_groupby_supported(func): + """ + Decorator for dask-cudf's groupby methods that returns the dask-cudf + method if the groupby object is supported, otherwise reverting to the + upstream Dask method + """ + + def wrapper(*args, **kwargs): + gb = args[0] + if _groupby_supported(gb): + return func(*args, **kwargs) + # note that we use upstream Dask's default kwargs for this call if + # none are specified; this shouldn't be an issue as those defaults are + # consistent with dask-cudf + return getattr(super(type(gb), gb), func.__name__)(*args[1:], **kwargs) + + return wrapper + + class CudfDataFrameGroupBy(DataFrameGroupBy): @_dask_cudf_nvtx_annotate def __init__(self, *args, **kwargs): @@ -65,6 +84,22 @@ def __getitem__(self, key): return g @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def count(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "count" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported def mean(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -72,13 +107,89 @@ def mean(self, split_every=None, split_out=1): {c: "mean" for c in self.obj.columns if c not in self.by}, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def std(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "std" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def var(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "var" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def sum(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "sum" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def min(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "min" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, ) @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def max(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "max" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported def collect(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -86,10 +197,40 @@ def collect(self, split_every=None, split_out=1): {c: "collect" for c in self.obj.columns if c not in self.by}, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def first(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "first" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def last(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "last" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, ) @_dask_cudf_nvtx_annotate @@ -98,17 +239,7 @@ def aggregate(self, arg, split_every=None, split_out=1): return self.size() arg = _redirect_aggs(arg) - if ( - isinstance(self.obj, DaskDataFrame) - and ( - isinstance(self.by, str) - or ( - isinstance(self.by, list) - and all(isinstance(x, str) for x in self.by) - ) - ) - and _is_supported(arg, SUPPORTED_AGGS) - ): + if _groupby_supported(self) and _aggs_supported(arg, SUPPORTED_AGGS): if isinstance(self._meta.grouping.keys, cudf.MultiIndex): keys = self._meta.grouping.keys.names else: @@ -120,10 +251,10 @@ def aggregate(self, arg, split_every=None, split_out=1): arg, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, ) return super().aggregate( @@ -139,6 +270,22 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def count(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "count"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported def mean(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -146,13 +293,14 @@ def mean(self, split_every=None, split_out=1): {self._slice: "mean"}, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, )[self._slice] @_dask_cudf_nvtx_annotate + @_check_groupby_supported def std(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -160,13 +308,14 @@ def std(self, split_every=None, split_out=1): {self._slice: "std"}, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, )[self._slice] @_dask_cudf_nvtx_annotate + @_check_groupby_supported def var(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -174,13 +323,59 @@ def var(self, split_every=None, split_out=1): {self._slice: "var"}, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, )[self._slice] @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def sum(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "sum"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def min(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "min"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def max(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "max"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported def collect(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -188,10 +383,40 @@ def collect(self, split_every=None, split_out=1): {self._slice: "collect"}, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def first(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "first"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def last(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "last"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, )[self._slice] @_dask_cudf_nvtx_annotate @@ -203,21 +428,17 @@ def aggregate(self, arg, split_every=None, split_out=1): if not isinstance(arg, dict): arg = {self._slice: arg} - if ( - isinstance(self.obj, DaskDataFrame) - and isinstance(self.by, (str, list)) - and _is_supported(arg, SUPPORTED_AGGS) - ): + if _groupby_supported(self) and _aggs_supported(arg, SUPPORTED_AGGS): return groupby_agg( self.obj, self.by, arg, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, )[self._slice] return super().aggregate( @@ -258,7 +479,7 @@ def groupby_agg( """ # Assert that aggregations are supported aggs = _redirect_aggs(aggs_in) - if not _is_supported(aggs, SUPPORTED_AGGS): + if not _aggs_supported(aggs, SUPPORTED_AGGS): raise ValueError( f"Supported aggs include {SUPPORTED_AGGS} for groupby_agg API. " f"Aggregations must be specified with dict or list syntax." @@ -420,7 +641,7 @@ def _redirect_aggs(arg): @_dask_cudf_nvtx_annotate -def _is_supported(arg, supported: set): +def _aggs_supported(arg, supported: set): """Check that aggregations in `arg` are a subset of `supported`""" if isinstance(arg, (list, dict)): if isinstance(arg, dict): @@ -439,6 +660,14 @@ def _is_supported(arg, supported: set): return False +def _groupby_supported(gb): + """Check that groupby input is supported by dask-cudf""" + return isinstance(gb.obj, DaskDataFrame) and ( + isinstance(gb.by, str) + or (isinstance(gb.by, list) and all(isinstance(x, str) for x in gb.by)) + ) + + def _make_name(*args, sep="_"): """Combine elements of `args` into a new string""" _args = (arg for arg in args if arg != "") diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index e3545149c24..d2c9ecd0293 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -11,7 +11,7 @@ from cudf.core._compat import PANDAS_GE_120 import dask_cudf -from dask_cudf.groupby import SUPPORTED_AGGS, _is_supported +from dask_cudf.groupby import SUPPORTED_AGGS, _aggs_supported @pytest.mark.parametrize("aggregation", SUPPORTED_AGGS) @@ -235,8 +235,7 @@ def test_groupby_split_out(split_out, column): @pytest.mark.parametrize( "by", ["a", "b", "c", "d", ["a", "b"], ["a", "c"], ["a", "d"]] ) -def test_groupby_dropna(dropna, by): - +def test_groupby_dropna_cudf(dropna, by): # NOTE: This test is borrowed from upstream dask # (dask/dask/dataframe/tests/test_groupby.py) df = cudf.DataFrame( @@ -265,6 +264,100 @@ def test_groupby_dropna(dropna, by): dd.assert_eq(dask_result, cudf_result) +@pytest.mark.parametrize( + "dropna,by", + [ + (False, "a"), + (False, "b"), + (False, "c"), + pytest.param( + False, + "d", + marks=pytest.mark.xfail( + reason="dropna=False is broken in Dask CPU for groupbys on " + "categorical columns" + ), + ), + pytest.param( + False, + ["a", "b"], + marks=pytest.mark.xfail( + reason="https://github.com/dask/dask/issues/8817" + ), + ), + pytest.param( + False, + ["a", "c"], + marks=pytest.mark.xfail( + reason="https://github.com/dask/dask/issues/8817" + ), + ), + pytest.param( + False, + ["a", "d"], + marks=pytest.mark.xfail( + reason="multi-col groupbys on categorical columns are broken " + "in Dask CPU" + ), + ), + (True, "a"), + (True, "b"), + (True, "c"), + (True, "d"), + (True, ["a", "b"]), + (True, ["a", "c"]), + pytest.param( + True, + ["a", "d"], + marks=pytest.mark.xfail( + reason="multi-col groupbys on categorical columns are broken " + "in Dask CPU" + ), + ), + (None, "a"), + (None, "b"), + (None, "c"), + (None, "d"), + (None, ["a", "b"]), + (None, ["a", "c"]), + pytest.param( + None, + ["a", "d"], + marks=pytest.mark.xfail( + reason="multi-col groupbys on categorical columns are broken " + "in Dask CPU" + ), + ), + ], +) +def test_groupby_dropna_dask(dropna, by): + # NOTE: This test is borrowed from upstream dask + # (dask/dask/dataframe/tests/test_groupby.py) + df = pd.DataFrame( + { + "a": [1, 2, 3, 4, None, None, 7, 8], + "b": [1, None, 1, 3, None, 3, 1, 3], + "c": ["a", "b", None, None, "e", "f", "g", "h"], + "e": [4, 5, 6, 3, 2, 1, 0, 0], + } + ) + df["b"] = df["b"].astype("datetime64[ns]") + df["d"] = df["c"].astype("category") + + gdf = cudf.from_pandas(df) + ddf = dd.from_pandas(df, npartitions=3) + gddf = dask_cudf.from_cudf(gdf, npartitions=3) + + if dropna is None: + dask_cudf_result = gddf.groupby(by).e.sum() + dask_result = ddf.groupby(by).e.sum() + else: + dask_cudf_result = gddf.groupby(by, dropna=dropna).e.sum() + dask_result = ddf.groupby(by, dropna=dropna).e.sum() + + dd.assert_eq(dask_cudf_result, dask_result) + + @pytest.mark.parametrize("myindex", [[1, 2] * 4, ["s1", "s2"] * 4]) def test_groupby_string_index_name(myindex): # GH-Issue #3420 @@ -570,7 +663,7 @@ def test_groupby_agg_redirect(aggregations): [["not_supported"], {"a": "not_supported"}, {"a": ["not_supported"]}], ) def test_is_supported(arg): - assert _is_supported(arg, {"supported"}) is False + assert _aggs_supported(arg, {"supported"}) is False def test_groupby_unique_lists(): From 0fcd364e00c2ec84a0128bf9b2688985e99e8e1f Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Tue, 10 May 2022 13:29:37 -0500 Subject: [PATCH 2/6] Enable writing to `s3` storage in chunked parquet writer (#10769) Resolves: #10522 This PR: - [x] Enables `s3` writing support in `ParquetDatasetWriter` - [x] Add's a work-around to reading an `s3` directory in `cudf.read_parquet`. Issue here: https://issues.apache.org/jira/browse/ARROW-16438 - [x] Introduces all the required `s3` python library combinations that will work together with such that `test_s3.py` can be run locally on dev environments. - [x] Improved the default `s3fs` error logs by changing the log level to `DEBUG` in pytests.(`S3FS_LOGGING_LEVEL`) Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - AJ Schmidt (https://github.com/ajschmidt8) - Richard (Rick) Zamora (https://github.com/rjzamora) - Ayush Dattagupta (https://github.com/ayushdg) - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/10769 --- conda/environments/cudf_dev_cuda11.5.yml | 5 + docs/cudf/source/api_docs/io.rst | 4 + python/cudf/cudf/io/__init__.py | 3 +- python/cudf/cudf/io/parquet.py | 168 +++++++++------- python/cudf/cudf/tests/test_s3.py | 185 ++++++++++++------ .../dask_cudf/dask_cudf/io/tests/test_s3.py | 29 +-- 6 files changed, 244 insertions(+), 150 deletions(-) diff --git a/conda/environments/cudf_dev_cuda11.5.yml b/conda/environments/cudf_dev_cuda11.5.yml index 1b79bdb763f..d6d05926099 100644 --- a/conda/environments/cudf_dev_cuda11.5.yml +++ b/conda/environments/cudf_dev_cuda11.5.yml @@ -67,6 +67,11 @@ dependencies: - pydata-sphinx-theme - librdkafka=1.7.0 - python-confluent-kafka=1.7.0 + - moto>=3.1.6 + - boto3>=1.21.21 + - botocore>=1.24.21 + - aiobotocore>=2.2.0 + - s3fs>=2022.3.0 - pip: - git+https://github.com/python-streamz/streamz.git@master - pyorc diff --git a/docs/cudf/source/api_docs/io.rst b/docs/cudf/source/api_docs/io.rst index 7e4d1b48c93..a52667cd3e4 100644 --- a/docs/cudf/source/api_docs/io.rst +++ b/docs/cudf/source/api_docs/io.rst @@ -36,6 +36,10 @@ Parquet read_parquet DataFrame.to_parquet cudf.io.parquet.read_parquet_metadata + :template: autosummary/class_with_autosummary.rst + + cudf.io.parquet.ParquetDatasetWriter + ORC ~~~ diff --git a/python/cudf/cudf/io/__init__.py b/python/cudf/cudf/io/__init__.py index 15404b26042..4ec84ecbc74 100644 --- a/python/cudf/cudf/io/__init__.py +++ b/python/cudf/cudf/io/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018, NVIDIA CORPORATION. +# Copyright (c) 2018-2022, NVIDIA CORPORATION. from cudf.io.avro import read_avro from cudf.io.csv import read_csv, to_csv from cudf.io.dlpack import from_dlpack @@ -9,6 +9,7 @@ from cudf.io.parquet import ( merge_parquet_filemetadata, read_parquet, + ParquetDatasetWriter, read_parquet_metadata, write_to_dataset, ) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index d7e85d72ba0..a9398a3139f 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -1,6 +1,8 @@ # Copyright (c) 2019-2022, NVIDIA CORPORATION. import math +import shutil +import tempfile import warnings from collections import defaultdict from contextlib import ExitStack @@ -232,12 +234,15 @@ def _process_dataset( filters = pq._filters_to_expression(filters) # Initialize ds.FilesystemDataset + # TODO: Remove the if len(paths) workaround after following bug is fixed: + # https://issues.apache.org/jira/browse/ARROW-16438 dataset = ds.dataset( - paths, + source=paths[0] if len(paths) == 1 else paths, filesystem=fs, format="parquet", partitioning="hive", ) + file_list = dataset.files if len(file_list) == 0: raise FileNotFoundError(f"{paths} could not be resolved to any files") @@ -837,6 +842,67 @@ def _parse_bytes(s): class ParquetDatasetWriter: + """ + Write a parquet file or dataset incrementally + + Parameters + ---------- + path : str + A local directory path or S3 URL. Will be used as root directory + path while writing a partitioned dataset. + partition_cols : list + Column names by which to partition the dataset + Columns are partitioned in the order they are given + index : bool, default None + If ``True``, include the dataframe's index(es) in the file output. + If ``False``, they will not be written to the file. If ``None``, + index(es) other than RangeIndex will be saved as columns. + compression : {'snappy', None}, default 'snappy' + Name of the compression to use. Use ``None`` for no compression. + statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP' + Level at which column statistics should be included in file. + max_file_size : int or str, default None + A file size that cannot be exceeded by the writer. + It is in bytes, if the input is int. + Size can also be a str in form or "10 MB", "1 GB", etc. + If this parameter is used, it is mandatory to pass + `file_name_prefix`. + file_name_prefix : str + This is a prefix to file names generated only when + `max_file_size` is specified. + + + Examples + -------- + Using a context + + >>> df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]}) + >>> df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]}) + >>> with ParquetDatasetWriter("./dataset", partition_cols=["a"]) as cw: + ... cw.write_table(df1) + ... cw.write_table(df2) + + By manually calling ``close()`` + + >>> cw = ParquetDatasetWriter("./dataset", partition_cols=["a"]) + >>> cw.write_table(df1) + >>> cw.write_table(df2) + >>> cw.close() + + Both the methods will generate the same directory structure + + .. code-block:: none + + dataset/ + a=1 + .parquet + a=2 + .parquet + a=3 + .parquet + + """ + @_cudf_nvtx_annotate def __init__( self, @@ -847,68 +913,15 @@ def __init__( statistics="ROWGROUP", max_file_size=None, file_name_prefix=None, + **kwargs, ) -> None: - """ - Write a parquet file or dataset incrementally - - Parameters - ---------- - path : str - File path or Root Directory path. Will be used as Root Directory - path while writing a partitioned dataset. - partition_cols : list - Column names by which to partition the dataset - Columns are partitioned in the order they are given - index : bool, default None - If ``True``, include the dataframe’s index(es) in the file output. - If ``False``, they will not be written to the file. If ``None``, - index(es) other than RangeIndex will be saved as columns. - compression : {'snappy', None}, default 'snappy' - Name of the compression to use. Use ``None`` for no compression. - statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP' - Level at which column statistics should be included in file. - max_file_size : int or str, default None - A file size that cannot be exceeded by the writer. - It is in bytes, if the input is int. - Size can also be a str in form or "10 MB", "1 GB", etc. - If this parameter is used, it is mandatory to pass - `file_name_prefix`. - file_name_prefix : str - This is a prefix to file names generated only when - `max_file_size` is specified. - - - Examples - ________ - Using a context - - >>> df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]}) - >>> df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]}) - >>> with ParquetDatasetWriter("./dataset", partition_cols=["a"]) as cw: - ... cw.write_table(df1) - ... cw.write_table(df2) - - By manually calling ``close()`` - - >>> cw = ParquetDatasetWriter("./dataset", partition_cols=["a"]) - >>> cw.write_table(df1) - >>> cw.write_table(df2) - >>> cw.close() - - Both the methods will generate the same directory structure - - .. code-block:: bash - - dataset/ - a=1 - .parquet - a=2 - .parquet - a=3 - .parquet + if isinstance(path, str) and path.startswith("s3://"): + self.fs_meta = {"is_s3": True, "actual_path": path} + self.path = tempfile.TemporaryDirectory().name + else: + self.fs_meta = {} + self.path = path - """ - self.path = path self.common_args = { "index": index, "compression": compression, @@ -923,6 +936,7 @@ def __init__( # Map of partition_col values to their ParquetWriter's index # in self._chunked_writers for reverse lookup self.path_cw_map: Dict[str, int] = {} + self.kwargs = kwargs self.filename = file_name_prefix self.max_file_size = max_file_size if max_file_size is not None: @@ -1051,18 +1065,19 @@ def write_table(self, df): ] cw.write_table(grouped_df, this_cw_part_info) - # Create new cw for unhandled paths encountered in this write_table - new_paths, part_info, meta_paths = zip(*new_cw_paths) - self._chunked_writers.append( - ( - ParquetWriter(new_paths, **self.common_args), - new_paths, - meta_paths, + if new_cw_paths: + # Create new cw for unhandled paths encountered in this write_table + new_paths, part_info, meta_paths = zip(*new_cw_paths) + self._chunked_writers.append( + ( + ParquetWriter(new_paths, **self.common_args), + new_paths, + meta_paths, + ) ) - ) - new_cw_idx = len(self._chunked_writers) - 1 - self.path_cw_map.update({k: new_cw_idx for k in new_paths}) - self._chunked_writers[-1][0].write_table(grouped_df, part_info) + new_cw_idx = len(self._chunked_writers) - 1 + self.path_cw_map.update({k: new_cw_idx for k in new_paths}) + self._chunked_writers[-1][0].write_table(grouped_df, part_info) @_cudf_nvtx_annotate def close(self, return_metadata=False): @@ -1076,6 +1091,15 @@ def close(self, return_metadata=False): for cw, _, meta_path in self._chunked_writers ] + if self.fs_meta.get("is_s3", False): + local_path = self.path + s3_path = self.fs_meta["actual_path"] + s3_file, _ = ioutils._get_filesystem_and_paths( + s3_path, **self.kwargs + ) + s3_file.put(local_path, s3_path, recursive=True) + shutil.rmtree(self.path) + if return_metadata: return ( merge_parquet_filemetadata(metadata) diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index e8d93caaf55..0966bee93fd 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -2,6 +2,7 @@ import os import shlex +import socket import subprocess import time from contextlib import contextmanager @@ -18,12 +19,22 @@ import cudf from cudf.testing._utils import assert_eq -moto = pytest.importorskip("moto", minversion="1.3.14") +moto = pytest.importorskip("moto", minversion="3.1.6") boto3 = pytest.importorskip("boto3") requests = pytest.importorskip("requests") s3fs = pytest.importorskip("s3fs") +@pytest.fixture(scope="session") +def endpoint_port(): + # Return a free port per worker session. + sock = socket.socket() + sock.bind(("127.0.0.1", 0)) + port = sock.getsockname()[1] + sock.close() + return port + + @contextmanager def ensure_safe_environment_variables(): """ @@ -40,7 +51,7 @@ def ensure_safe_environment_variables(): @pytest.fixture(scope="session") -def s3_base(worker_id): +def s3_base(endpoint_port): """ Fixture to set up moto server in separate process """ @@ -49,15 +60,11 @@ def s3_base(worker_id): # system aws credentials, https://github.com/spulec/moto/issues/1793 os.environ.setdefault("AWS_ACCESS_KEY_ID", "foobar_key") os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "foobar_secret") + os.environ.setdefault("S3FS_LOGGING_LEVEL", "DEBUG") # Launching moto in server mode, i.e., as a separate process # with an S3 endpoint on localhost - endpoint_port = ( - 5000 - if worker_id == "master" - else 5550 + int(worker_id.lstrip("gw")) - ) endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" proc = subprocess.Popen( @@ -82,13 +89,10 @@ def s3_base(worker_id): @pytest.fixture() -def s3so(worker_id): +def s3so(endpoint_port): """ Returns s3 storage options to pass to fsspec """ - endpoint_port = ( - 5000 if worker_id == "master" else 5550 + int(worker_id.lstrip("gw")) - ) endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" return {"client_kwargs": {"endpoint_url": endpoint_uri}} @@ -141,13 +145,13 @@ def pdf_ext(scope="module"): def test_read_csv(s3_base, s3so, pdf, bytes_per_thread): # Write to buffer fname = "test_csv_reader.csv" - bname = "csv" + bucket = "csv" buffer = pdf.to_csv(index=False) # Use fsspec file object - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_csv( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, bytes_per_thread=bytes_per_thread, use_python_file_object=False, @@ -155,9 +159,9 @@ def test_read_csv(s3_base, s3so, pdf, bytes_per_thread): assert_eq(pdf, got) # Use Arrow PythonFile object - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_csv( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, bytes_per_thread=bytes_per_thread, use_python_file_object=True, @@ -168,13 +172,13 @@ def test_read_csv(s3_base, s3so, pdf, bytes_per_thread): def test_read_csv_arrow_nativefile(s3_base, s3so, pdf): # Write to buffer fname = "test_csv_reader_arrow_nativefile.csv" - bname = "csv" + bucket = "csv" buffer = pdf.to_csv(index=False) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): fs = pa_fs.S3FileSystem( endpoint_override=s3so["client_kwargs"]["endpoint_url"], ) - with fs.open_input_file(f"{bname}/{fname}") as fil: + with fs.open_input_file(f"{bucket}/{fname}") as fil: got = cudf.read_csv(fil) assert_eq(pdf, got) @@ -187,13 +191,13 @@ def test_read_csv_byte_range( ): # Write to buffer fname = "test_csv_reader_byte_range.csv" - bname = "csv" + bucket = "csv" buffer = pdf.to_csv(index=False) # Use fsspec file object - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_csv( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, byte_range=(74, 73), bytes_per_thread=bytes_per_thread, @@ -209,19 +213,19 @@ def test_read_csv_byte_range( def test_write_csv(s3_base, s3so, pdf, chunksize): # Write to buffer fname = "test_csv_writer.csv" - bname = "csv" + bucket = "csv" gdf = cudf.from_pandas(pdf) - with s3_context(s3_base=s3_base, bucket=bname) as s3fs: + with s3_context(s3_base=s3_base, bucket=bucket) as s3fs: gdf.to_csv( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", index=False, chunksize=chunksize, storage_options=s3so, ) - assert s3fs.exists(f"s3://{bname}/{fname}") + assert s3fs.exists(f"s3://{bucket}/{fname}") # TODO: Update to use `storage_options` from pandas v1.2.0 - got = pd.read_csv(s3fs.open(f"s3://{bname}/{fname}")) + got = pd.read_csv(s3fs.open(f"s3://{bucket}/{fname}")) assert_eq(pdf, got) @@ -240,15 +244,15 @@ def test_read_parquet( use_python_file_object, ): fname = "test_parquet_reader.parquet" - bname = "parquet" + bucket = "parquet" buffer = BytesIO() pdf.to_parquet(path=buffer) # Check direct path handling buffer.seek(0) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got1 = cudf.read_parquet( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", open_file_options=( {"precache_options": {"method": precache}} if use_python_file_object @@ -264,11 +268,11 @@ def test_read_parquet( # Check fsspec file-object handling buffer.seek(0) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): - fs = get_fs_token_paths(f"s3://{bname}/{fname}", storage_options=s3so)[ - 0 - ] - with fs.open(f"s3://{bname}/{fname}", mode="rb") as f: + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): + fs = get_fs_token_paths( + f"s3://{bucket}/{fname}", storage_options=s3so + )[0] + with fs.open(f"s3://{bucket}/{fname}", mode="rb") as f: got2 = cudf.read_parquet( f, bytes_per_thread=bytes_per_thread, @@ -290,7 +294,7 @@ def test_read_parquet_ext( index, ): fname = "test_parquet_reader_ext.parquet" - bname = "parquet" + bucket = "parquet" buffer = BytesIO() if index: @@ -300,9 +304,9 @@ def test_read_parquet_ext( # Check direct path handling buffer.seek(0) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got1 = cudf.read_parquet( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, bytes_per_thread=bytes_per_thread, footer_sample_size=3200, @@ -323,15 +327,15 @@ def test_read_parquet_ext( def test_read_parquet_arrow_nativefile(s3_base, s3so, pdf, columns): # Write to buffer fname = "test_parquet_reader_arrow_nativefile.parquet" - bname = "parquet" + bucket = "parquet" buffer = BytesIO() pdf.to_parquet(path=buffer) buffer.seek(0) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): fs = pa_fs.S3FileSystem( endpoint_override=s3so["client_kwargs"]["endpoint_url"], ) - with fs.open_input_file(f"{bname}/{fname}") as fil: + with fs.open_input_file(f"{bucket}/{fname}") as fil: got = cudf.read_parquet(fil, columns=columns) expect = pdf[columns] if columns else pdf @@ -341,14 +345,14 @@ def test_read_parquet_arrow_nativefile(s3_base, s3so, pdf, columns): @pytest.mark.parametrize("precache", [None, "parquet"]) def test_read_parquet_filters(s3_base, s3so, pdf_ext, precache): fname = "test_parquet_reader_filters.parquet" - bname = "parquet" + bucket = "parquet" buffer = BytesIO() pdf_ext.to_parquet(path=buffer) buffer.seek(0) filters = [("String", "==", "Omega")] - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_parquet( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, filters=filters, open_file_options={"precache_options": {"method": precache}}, @@ -360,25 +364,38 @@ def test_read_parquet_filters(s3_base, s3so, pdf_ext, precache): @pytest.mark.parametrize("partition_cols", [None, ["String"]]) def test_write_parquet(s3_base, s3so, pdf, partition_cols): - fname = "test_parquet_writer.parquet" - bname = "parquet" + fname_cudf = "test_parquet_writer_cudf" + fname_pandas = "test_parquet_writer_pandas" + bucket = "parquet" gdf = cudf.from_pandas(pdf) - with s3_context(s3_base=s3_base, bucket=bname) as s3fs: + + with s3_context(s3_base=s3_base, bucket=bucket) as s3fs: gdf.to_parquet( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname_cudf}", + partition_cols=partition_cols, + storage_options=s3so, + ) + assert s3fs.exists(f"s3://{bucket}/{fname_cudf}") + pdf.to_parquet( + f"s3://{bucket}/{fname_pandas}", partition_cols=partition_cols, storage_options=s3so, ) - assert s3fs.exists(f"s3://{bname}/{fname}") + assert s3fs.exists(f"s3://{bucket}/{fname_pandas}") - got = pd.read_parquet(s3fs.open(f"s3://{bname}/{fname}")) + got = pd.read_parquet( + f"s3://{bucket}/{fname_pandas}", storage_options=s3so + ) + expect = cudf.read_parquet( + f"s3://{bucket}/{fname_cudf}", storage_options=s3so + ) - assert_eq(pdf, got) + assert_eq(expect, got) def test_read_json(s3_base, s3so): fname = "test_json_reader.json" - bname = "json" + bucket = "json" # TODO: After following bug is fixed switch # back to using bytes: # https://github.com/pandas-dev/pandas/issues/46935 @@ -396,9 +413,9 @@ def test_read_json(s3_base, s3so): '{"amount": 400, "name": "Dennis"}\n' ) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_json( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", engine="cudf", orient="records", lines=True, @@ -414,15 +431,15 @@ def test_read_json(s3_base, s3so): def test_read_orc(s3_base, s3so, datadir, use_python_file_object, columns): source_file = str(datadir / "orc" / "TestOrcFile.testSnappy.orc") fname = "test_orc_reader.orc" - bname = "orc" + bucket = "orc" expect = pa.orc.ORCFile(source_file).read().to_pandas() with open(source_file, "rb") as f: buffer = f.read() - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_orc( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", columns=columns, storage_options=s3so, use_python_file_object=use_python_file_object, @@ -437,17 +454,17 @@ def test_read_orc(s3_base, s3so, datadir, use_python_file_object, columns): def test_read_orc_arrow_nativefile(s3_base, s3so, datadir, columns): source_file = str(datadir / "orc" / "TestOrcFile.testSnappy.orc") fname = "test_orc_reader.orc" - bname = "orc" + bucket = "orc" expect = pa.orc.ORCFile(source_file).read().to_pandas() with open(source_file, "rb") as f: buffer = f.read() - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): fs = pa_fs.S3FileSystem( endpoint_override=s3so["client_kwargs"]["endpoint_url"], ) - with fs.open_input_file(f"{bname}/{fname}") as fil: + with fs.open_input_file(f"{bucket}/{fname}") as fil: got = cudf.read_orc(fil, columns=columns) if columns: @@ -457,13 +474,51 @@ def test_read_orc_arrow_nativefile(s3_base, s3so, datadir, columns): def test_write_orc(s3_base, s3so, pdf): fname = "test_orc_writer.orc" - bname = "orc" + bucket = "orc" gdf = cudf.from_pandas(pdf) - with s3_context(s3_base=s3_base, bucket=bname) as s3fs: - gdf.to_orc(f"s3://{bname}/{fname}", storage_options=s3so) - assert s3fs.exists(f"s3://{bname}/{fname}") + with s3_context(s3_base=s3_base, bucket=bucket) as s3fs: + gdf.to_orc(f"s3://{bucket}/{fname}", storage_options=s3so) + assert s3fs.exists(f"s3://{bucket}/{fname}") - with s3fs.open(f"s3://{bname}/{fname}") as f: + with s3fs.open(f"s3://{bucket}/{fname}") as f: got = pa.orc.ORCFile(f).read().to_pandas() assert_eq(pdf, got) + + +def test_write_chunked_parquet(s3_base, s3so): + df1 = cudf.DataFrame({"b": [10, 11, 12], "a": [1, 2, 3]}) + df2 = cudf.DataFrame({"b": [20, 30, 50], "a": [3, 2, 1]}) + dirname = "chunked_writer_directory" + bucket = "parquet" + from cudf.io.parquet import ParquetDatasetWriter + + with s3_context( + s3_base=s3_base, bucket=bucket, files={dirname: BytesIO()} + ) as s3fs: + with ParquetDatasetWriter( + f"s3://{bucket}/{dirname}", + partition_cols=["a"], + storage_options=s3so, + ) as cw: + cw.write_table(df1) + cw.write_table(df2) + + # TODO: Replace following workaround with: + # expect = cudf.read_parquet(f"s3://{bucket}/{dirname}/", + # storage_options=s3so) + # after the following bug is fixed: + # https://issues.apache.org/jira/browse/ARROW-16438 + + dfs = [] + for folder in {"a=1", "a=2", "a=3"}: + assert s3fs.exists(f"s3://{bucket}/{dirname}/{folder}") + for file in s3fs.ls(f"s3://{bucket}/{dirname}/{folder}"): + df = cudf.read_parquet("s3://" + file, storage_options=s3so) + dfs.append(df) + + actual = cudf.concat(dfs).astype("int64") + assert_eq( + actual.sort_values(["b"]).reset_index(drop=True), + cudf.concat([df1, df2]).sort_values(["b"]).reset_index(drop=True), + ) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_s3.py b/python/dask_cudf/dask_cudf/io/tests/test_s3.py index 83ff1273b36..9283380296c 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_s3.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_s3.py @@ -1,5 +1,8 @@ +# Copyright (c) 2020-2022, NVIDIA CORPORATION. + import os import shlex +import socket import subprocess import time from contextlib import contextmanager @@ -11,12 +14,22 @@ import dask_cudf -moto = pytest.importorskip("moto", minversion="1.3.14") +moto = pytest.importorskip("moto", minversion="3.1.6") boto3 = pytest.importorskip("boto3") requests = pytest.importorskip("requests") s3fs = pytest.importorskip("s3fs") +@pytest.fixture(scope="session") +def endpoint_port(): + # Return a free port per worker session. + sock = socket.socket() + sock.bind(("127.0.0.1", 0)) + port = sock.getsockname()[1] + sock.close() + return port + + @contextmanager def ensure_safe_environment_variables(): """ @@ -33,7 +46,7 @@ def ensure_safe_environment_variables(): @pytest.fixture(scope="session") -def s3_base(worker_id): +def s3_base(endpoint_port): """ Fixture to set up moto server in separate process """ @@ -42,15 +55,10 @@ def s3_base(worker_id): # system aws credentials, https://github.com/spulec/moto/issues/1793 os.environ.setdefault("AWS_ACCESS_KEY_ID", "foobar_key") os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "foobar_secret") + os.environ.setdefault("S3FS_LOGGING_LEVEL", "DEBUG") # Launching moto in server mode, i.e., as a separate process # with an S3 endpoint on localhost - - endpoint_port = ( - 5000 - if worker_id == "master" - else 5550 + int(worker_id.lstrip("gw")) - ) endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" proc = subprocess.Popen( @@ -75,13 +83,10 @@ def s3_base(worker_id): @pytest.fixture() -def s3so(worker_id): +def s3so(endpoint_port): """ Returns s3 storage options to pass to fsspec """ - endpoint_port = ( - 5000 if worker_id == "master" else 5550 + int(worker_id.lstrip("gw")) - ) endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" return {"client_kwargs": {"endpoint_url": endpoint_uri}} From 4539e5e60d2bc2c81338a5d646f5a9c3ac5ef7cf Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 10 May 2022 11:42:34 -0700 Subject: [PATCH 3/6] Refactor `cudf::contains`, renaming and switching parameters role (#10802) This PR does the following: * Renaming parameters of `cudf::contains`, changing from `t`/`values`, `col`/`value`, etc... into `haystack`/`needle` in a consistent way. * Switching the role of `haystack` and `needles` parameters of the overload `cudf::contains(column_view, column_view)`, which incorrectly searches for `haystack` inside `needles`. * Update the internal usage of that overloads in cudf. * Update unit tests. * Rewriting all `cudf::contains` doxygen. * And some minor code cleanup/refactor. Since the role of parameters is switched, this causes breaking changes. Closes https://github.com/rapidsai/cudf/issues/10725. In addition, this is also a foundation for more changes in `search.cu` to support nested types in https://github.com/rapidsai/cudf/pull/10656. Authors: - Nghia Truong (https://github.com/ttnghia) Approvers: - Jake Hemstad (https://github.com/jrhemstad) - Jason Lowe (https://github.com/jlowe) - David Wendt (https://github.com/davidwendt) - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/10802 --- cpp/include/cudf/detail/search.hpp | 26 +- cpp/include/cudf/search.hpp | 141 +++++------ cpp/src/dictionary/remove_keys.cu | 4 +- cpp/src/dictionary/set_keys.cu | 2 +- cpp/src/search/search.cu | 228 ++++++++---------- cpp/tests/search/search_dictionary_test.cpp | 6 +- cpp/tests/search/search_test.cpp | 24 +- .../main/java/ai/rapids/cudf/ColumnView.java | 19 +- java/src/main/native/src/ColumnViewJni.cpp | 17 +- python/cudf/cudf/_lib/cpp/search.pxd | 10 +- python/cudf/cudf/core/column/string.py | 2 +- 11 files changed, 227 insertions(+), 252 deletions(-) diff --git a/cpp/include/cudf/detail/search.hpp b/cpp/include/cudf/detail/search.hpp index c986418c790..44067ff87c0 100644 --- a/cpp/include/cudf/detail/search.hpp +++ b/cpp/include/cudf/detail/search.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,11 +33,11 @@ namespace detail { * @param stream CUDA stream used for device memory operations and kernel launches. */ std::unique_ptr lower_bound( - table_view const& t, - table_view const& values, + table_view const& haystack, + table_view const& needles, std::vector const& column_order, std::vector const& null_precedence, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** @@ -46,33 +46,29 @@ std::unique_ptr lower_bound( * @param stream CUDA stream used for device memory operations and kernel launches. */ std::unique_ptr upper_bound( - table_view const& t, - table_view const& values, + table_view const& haystack, + table_view const& needles, std::vector const& column_order, std::vector const& null_precedence, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** - * @copydoc cudf::contains(column_view const&, scalar const&, - * rmm::mr::device_memory_resource*) + * @copydoc cudf::contains(column_view const&, scalar const&, rmm::mr::device_memory_resource*) * * @param stream CUDA stream used for device memory operations and kernel launches. */ -bool contains(column_view const& col, - scalar const& value, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); +bool contains(column_view const& haystack, scalar const& needle, rmm::cuda_stream_view stream); /** - * @copydoc cudf::contains(column_view const&, column_view const&, - * rmm::mr::device_memory_resource*) + * @copydoc cudf::contains(column_view const&, column_view const&, rmm::mr::device_memory_resource*) * * @param stream CUDA stream used for device memory operations and kernel launches. */ std::unique_ptr contains( column_view const& haystack, column_view const& needles, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); } // namespace detail diff --git a/cpp/include/cudf/search.hpp b/cpp/include/cudf/search.hpp index 56a31891e27..3b68923ee93 100644 --- a/cpp/include/cudf/search.hpp +++ b/cpp/include/cudf/search.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,134 +32,123 @@ namespace cudf { */ /** - * @brief Find smallest indices in a sorted table where values should be - * inserted to maintain order + * @brief Find smallest indices in a sorted table where values should be inserted to maintain order. * - * For each row v in @p values, find the first index in @p t where - * inserting the row will maintain the sort order of @p t + * For each row in `needles`, find the first index in `haystack` where inserting the row still + * maintains its sort order. * * @code{.pseudo} * Example: * * Single column: - * idx 0 1 2 3 4 - * column = { 10, 20, 20, 30, 50 } - * values = { 20 } - * result = { 1 } + * idx 0 1 2 3 4 + * haystack = { 10, 20, 20, 30, 50 } + * needles = { 20 } + * result = { 1 } * * Multi Column: - * idx 0 1 2 3 4 - * t = {{ 10, 20, 20, 20, 20 }, - * { 5.0, .5, .5, .7, .7 }, - * { 90, 77, 78, 61, 61 }} - * values = {{ 20 }, - * { .7 }, - * { 61 }} - * result = { 3 } + * idx 0 1 2 3 4 + * haystack = {{ 10, 20, 20, 20, 20 }, + * { 5.0, .5, .5, .7, .7 }, + * { 90, 77, 78, 61, 61 }} + * needles = {{ 20 }, + * { .7 }, + * { 61 }} + * result = { 3 } * @endcode * - * @param t Table to search - * @param values Find insert locations for these values - * @param column_order Vector of column sort order - * @param null_precedence Vector of null_precedence enums values - * @param mr Device memory resource used to allocate the returned column's device - * memory + * @param haystack The table containing search space. + * @param needles Values for which to find the insert locations in the search space. + * @param column_order Vector of column sort order. + * @param null_precedence Vector of null_precedence enums needles. + * @param mr Device memory resource used to allocate the returned column's device memory. * @return A non-nullable column of cudf::size_type elements containing the insertion points. */ std::unique_ptr lower_bound( - table_view const& t, - table_view const& values, + table_view const& haystack, + table_view const& needles, std::vector const& column_order, std::vector const& null_precedence, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** - * @brief Find largest indices in a sorted table where values should be - * inserted to maintain order + * @brief Find largest indices in a sorted table where values should be inserted to maintain order. * - * For each row v in @p values, find the last index in @p t where - * inserting the row will maintain the sort order of @p t + * For each row in `needles`, find the last index in `haystack` where inserting the row still + * maintains its sort order. * * @code{.pseudo} * Example: * * Single Column: - * idx 0 1 2 3 4 - * column = { 10, 20, 20, 30, 50 } - * values = { 20 } - * result = { 3 } + * idx 0 1 2 3 4 + * haystack = { 10, 20, 20, 30, 50 } + * needles = { 20 } + * result = { 3 } * * Multi Column: - * idx 0 1 2 3 4 - * t = {{ 10, 20, 20, 20, 20 }, - * { 5.0, .5, .5, .7, .7 }, - * { 90, 77, 78, 61, 61 }} - * values = {{ 20 }, - * { .7 }, - * { 61 }} - * result = { 5 } + * idx 0 1 2 3 4 + * haystack = {{ 10, 20, 20, 20, 20 }, + * { 5.0, .5, .5, .7, .7 }, + * { 90, 77, 78, 61, 61 }} + * needles = {{ 20 }, + * { .7 }, + * { 61 }} + * result = { 5 } * @endcode * - * @param search_table Table to search - * @param values Find insert locations for these values - * @param column_order Vector of column sort order - * @param null_precedence Vector of null_precedence enums values - * @param mr Device memory resource used to allocate the returned column's device - * memory + * @param haystack The table containing search space. + * @param needles Values for which to find the insert locations in the search space. + * @param column_order Vector of column sort order. + * @param null_precedence Vector of null_precedence enums needles. + * @param mr Device memory resource used to allocate the returned column's device memory. * @return A non-nullable column of cudf::size_type elements containing the insertion points. */ std::unique_ptr upper_bound( - table_view const& search_table, - table_view const& values, + table_view const& haystack, + table_view const& needles, std::vector const& column_order, std::vector const& null_precedence, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** - * @brief Find if the `value` is present in the `col` + * @brief Check if the given `needle` value exists in the `haystack` column. * - * @throws cudf::logic_error - * If `col.type() != values.type()` + * @throws cudf::logic_error If `haystack.type() != needle.type()`. * * @code{.pseudo} * Single Column: - * idx 0 1 2 3 4 - * col = { 10, 20, 20, 30, 50 } - * Scalar: - * value = { 20 } - * result = true + * idx 0 1 2 3 4 + * haystack = { 10, 20, 20, 30, 50 } + * needle = { 20 } + * result = true * @endcode * - * @param col A column object - * @param value A scalar value to search for in `col` - * - * @return bool If `value` is found in `column` true, else false. + * @param haystack The column containing search space. + * @param needle A scalar value to check for existence in the search space. + * @return true if the given `needle` value exists in the `haystack` column. */ -bool contains(column_view const& col, scalar const& value); +bool contains(column_view const& haystack, scalar const& needle); /** - * @brief Returns a new column of type bool identifying for each element of @p haystack column, - * if that element is contained in @p needles column. + * @brief Check if the given `needles` values exists in the `haystack` column. * - * The new column will have the same dimension and null status as the @p haystack column. That is, - * any element that is invalid in the @p haystack column will be invalid in the returned column. + * The new column will have type BOOL and have the same size and null mask as the input `needles` + * column. That is, any null row in the `needles` column will result in a nul row in the output + * column. * - * @throws cudf::logic_error - * If `haystack.type() != needles.type()` + * @throws cudf::logic_error If `haystack.type() != needles.type()` * * @code{.pseudo} * haystack = { 10, 20, 30, 40, 50 } * needles = { 20, 40, 60, 80 } - * - * result = { false, true, false, true, false } + * result = { true, true, false, false } * @endcode * - * @param haystack A column object - * @param needles A column of values to search for in `col` - * @param mr Device memory resource used to allocate the returned column's device memory - * - * @return A column of bool elements containing true if the corresponding entry in haystack - * appears in needles and false if it does not. + * @param haystack The column containing search space. + * @param needles A column of values to check for existence in the search space. + * @param mr Device memory resource used to allocate the returned column's device memory. + * @return A BOOL column indicating if each element in `needles` exists in the search space. */ std::unique_ptr contains( column_view const& haystack, diff --git a/cpp/src/dictionary/remove_keys.cu b/cpp/src/dictionary/remove_keys.cu index c4b3bbc00e4..a98e69149af 100644 --- a/cpp/src/dictionary/remove_keys.cu +++ b/cpp/src/dictionary/remove_keys.cu @@ -158,7 +158,7 @@ std::unique_ptr remove_keys( CUDF_EXPECTS(keys_view.type() == keys_to_remove.type(), "keys types must match"); // locate keys to remove by searching the keys column - auto const matches = cudf::detail::contains(keys_view, keys_to_remove, stream, mr); + auto const matches = cudf::detail::contains(keys_to_remove, keys_view, stream, mr); auto d_matches = matches->view().data(); // call common utility method to keep the keys not matched to keys_to_remove auto key_matcher = [d_matches] __device__(size_type idx) { return !d_matches[idx]; }; @@ -181,7 +181,7 @@ std::unique_ptr remove_unused_keys( thrust::sequence(rmm::exec_policy(stream), keys_positions.begin(), keys_positions.end()); // wrap the indices for comparison in contains() column_view keys_positions_view(data_type{type_id::UINT32}, keys_size, keys_positions.data()); - return cudf::detail::contains(keys_positions_view, indices_view, stream, mr); + return cudf::detail::contains(indices_view, keys_positions_view, stream, mr); }(); auto d_matches = matches->view().data(); diff --git a/cpp/src/dictionary/set_keys.cu b/cpp/src/dictionary/set_keys.cu index dfc6cbb78cc..25c46837e9f 100644 --- a/cpp/src/dictionary/set_keys.cu +++ b/cpp/src/dictionary/set_keys.cu @@ -138,7 +138,7 @@ std::unique_ptr set_keys( std::unique_ptr keys_column(std::move(sorted_keys.front())); // compute the new nulls - auto matches = cudf::detail::contains(keys, keys_column->view(), stream, mr); + auto matches = cudf::detail::contains(keys_column->view(), keys, stream, mr); auto d_matches = matches->view().data(); auto indices_itr = cudf::detail::indexalator_factory::make_input_iterator(dictionary_column.indices()); diff --git a/cpp/src/search/search.cu b/cpp/src/search/search.cu index 29eddf703df..491ad49e020 100644 --- a/cpp/src/search/search.cu +++ b/cpp/src/search/search.cu @@ -43,40 +43,9 @@ namespace cudf { namespace { -template -void launch_search(DataIterator it_data, - ValuesIterator it_vals, - size_type data_size, - size_type values_size, - OutputIterator it_output, - Comparator comp, - bool find_first, - rmm::cuda_stream_view stream) -{ - if (find_first) { - thrust::lower_bound(rmm::exec_policy(stream), - it_data, - it_data + data_size, - it_vals, - it_vals + values_size, - it_output, - comp); - } else { - thrust::upper_bound(rmm::exec_policy(stream), - it_data, - it_data + data_size, - it_vals, - it_vals + values_size, - it_output, - comp); - } -} -std::unique_ptr search_ordered(table_view const& t, - table_view const& values, +std::unique_ptr search_ordered(table_view const& haystack, + table_view const& needles, bool find_first, std::vector const& column_order, std::vector const& null_precedence, @@ -84,30 +53,30 @@ std::unique_ptr search_ordered(table_view const& t, rmm::mr::device_memory_resource* mr) { CUDF_EXPECTS( - column_order.empty() or static_cast(t.num_columns()) == column_order.size(), + column_order.empty() or static_cast(haystack.num_columns()) == column_order.size(), "Mismatch between number of columns and column order."); - CUDF_EXPECTS( - null_precedence.empty() or static_cast(t.num_columns()) == null_precedence.size(), - "Mismatch between number of columns and null precedence."); + CUDF_EXPECTS(null_precedence.empty() or + static_cast(haystack.num_columns()) == null_precedence.size(), + "Mismatch between number of columns and null precedence."); // Allocate result column auto result = make_numeric_column( - data_type{type_to_id()}, values.num_rows(), mask_state::UNALLOCATED, stream, mr); - auto const result_out = result->mutable_view().data(); + data_type{type_to_id()}, needles.num_rows(), mask_state::UNALLOCATED, stream, mr); + auto const out_it = result->mutable_view().data(); // Handle empty inputs - if (t.num_rows() == 0) { + if (haystack.num_rows() == 0) { CUDF_CUDA_TRY( - cudaMemsetAsync(result_out, 0, values.num_rows() * sizeof(size_type), stream.value())); + cudaMemsetAsync(out_it, 0, needles.num_rows() * sizeof(size_type), stream.value())); return result; } // This utility will ensure all corresponding dictionary columns have matching keys. // It will return any new dictionary columns created as well as updated table_views. - auto const matched = dictionary::detail::match_dictionaries({t, values}, stream); + auto const matched = dictionary::detail::match_dictionaries({haystack, needles}, stream); // Prepare to flatten the structs column - auto const has_null_elements = has_nested_nulls(t) or has_nested_nulls(values); + auto const has_null_elements = has_nested_nulls(haystack) or has_nested_nulls(needles); auto const flatten_nullability = has_null_elements ? structs::detail::column_nullability::FORCE : structs::detail::column_nullability::MATCH_INCOMING; @@ -135,37 +104,50 @@ std::unique_ptr search_ordered(table_view const& t, rhs, column_order_dv.data(), null_precedence_dv.data()); - launch_search( - count_it, count_it, t.num_rows(), values.num_rows(), result_out, comp, find_first, stream); + + auto const do_search = [find_first](auto&&... args) { + if (find_first) { + thrust::lower_bound(std::forward(args)...); + } else { + thrust::upper_bound(std::forward(args)...); + } + }; + do_search(rmm::exec_policy(stream), + count_it, + count_it + haystack.num_rows(), + count_it, + count_it + needles.num_rows(), + out_it, + comp); return result; } struct contains_scalar_dispatch { template - bool operator()(column_view const& col, scalar const& value, rmm::cuda_stream_view stream) + bool operator()(column_view const& haystack, scalar const& needle, rmm::cuda_stream_view stream) { - CUDF_EXPECTS(col.type() == value.type(), "scalar and column types must match"); + CUDF_EXPECTS(haystack.type() == needle.type(), "scalar and column types must match"); using Type = device_storage_type_t; using ScalarType = cudf::scalar_type_t; - auto d_col = column_device_view::create(col, stream); - auto s = static_cast(&value); + auto d_haystack = column_device_view::create(haystack, stream); + auto s = static_cast(&needle); - if (col.has_nulls()) { + if (haystack.has_nulls()) { auto found_iter = thrust::find(rmm::exec_policy(stream), - d_col->pair_begin(), - d_col->pair_end(), + d_haystack->pair_begin(), + d_haystack->pair_end(), thrust::make_pair(s->value(stream), true)); - return found_iter != d_col->pair_end(); + return found_iter != d_haystack->pair_end(); } else { auto found_iter = thrust::find(rmm::exec_policy(stream), // - d_col->begin(), - d_col->end(), + d_haystack->begin(), + d_haystack->end(), s->value(stream)); - return found_iter != d_col->end(); + return found_iter != d_haystack->end(); } } }; @@ -179,66 +161,69 @@ bool contains_scalar_dispatch::operator()(column_view const&, } template <> -bool contains_scalar_dispatch::operator()(column_view const& col, - scalar const& value, +bool contains_scalar_dispatch::operator()(column_view const& haystack, + scalar const& needle, rmm::cuda_stream_view stream) { - CUDF_EXPECTS(col.type() == value.type(), "scalar and column types must match"); + CUDF_EXPECTS(haystack.type() == needle.type(), "scalar and column types must match"); - auto const scalar_table = static_cast(&value)->view(); - CUDF_EXPECTS(col.num_children() == scalar_table.num_columns(), + auto const scalar_table = static_cast(&needle)->view(); + CUDF_EXPECTS(haystack.num_children() == scalar_table.num_columns(), "struct scalar and structs column must have the same number of children"); - for (size_type i = 0; i < col.num_children(); ++i) { - CUDF_EXPECTS(col.child(i).type() == scalar_table.column(i).type(), + for (size_type i = 0; i < haystack.num_children(); ++i) { + CUDF_EXPECTS(haystack.child(i).type() == scalar_table.column(i).type(), "scalar and column children types must match"); } // Prepare to flatten the structs column and scalar. - auto const has_null_elements = - has_nested_nulls(table_view{std::vector{col.child_begin(), col.child_end()}}) || - has_nested_nulls(scalar_table); + auto const has_null_elements = has_nested_nulls(table_view{std::vector{ + haystack.child_begin(), haystack.child_end()}}) || + has_nested_nulls(scalar_table); auto const flatten_nullability = has_null_elements ? structs::detail::column_nullability::FORCE : structs::detail::column_nullability::MATCH_INCOMING; // Flatten the input structs column, only materialize the bitmask if there is null in the input. - auto const col_flattened = - structs::detail::flatten_nested_columns(table_view{{col}}, {}, {}, flatten_nullability); - auto const val_flattened = + auto const haystack_flattened = + structs::detail::flatten_nested_columns(table_view{{haystack}}, {}, {}, flatten_nullability); + auto const needle_flattened = structs::detail::flatten_nested_columns(scalar_table, {}, {}, flatten_nullability); // The struct scalar only contains the struct member columns. // Thus, if there is any null in the input, we must exclude the first column in the flattened // table of the input column from searching because that column is the materialized bitmask of // the input structs column. - auto const col_flattened_content = col_flattened.flattened_columns(); - auto const col_flattened_children = table_view{std::vector{ - col_flattened_content.begin() + static_cast(has_null_elements), - col_flattened_content.end()}}; + auto const haystack_flattened_content = haystack_flattened.flattened_columns(); + auto const haystack_flattened_children = table_view{std::vector{ + haystack_flattened_content.begin() + static_cast(has_null_elements), + haystack_flattened_content.end()}}; - auto const d_col_children_ptr = table_device_view::create(col_flattened_children, stream); - auto const d_val_ptr = table_device_view::create(val_flattened, stream); + auto const d_haystack_children_ptr = + table_device_view::create(haystack_flattened_children, stream); + auto const d_needle_ptr = table_device_view::create(needle_flattened, stream); auto const start_iter = thrust::make_counting_iterator(0); - auto const end_iter = start_iter + col.size(); - auto const comp = row_equality_comparator( - nullate::DYNAMIC{has_null_elements}, *d_col_children_ptr, *d_val_ptr, null_equality::EQUAL); + auto const end_iter = start_iter + haystack.size(); + auto const comp = row_equality_comparator(nullate::DYNAMIC{has_null_elements}, + *d_haystack_children_ptr, + *d_needle_ptr, + null_equality::EQUAL); auto const found_iter = thrust::find_if( rmm::exec_policy(stream), start_iter, end_iter, [comp] __device__(auto const idx) { - return comp(idx, 0); // compare col[idx] == val[0]. + return comp(idx, 0); // compare haystack[idx] == val[0]. }); return found_iter != end_iter; } template <> -bool contains_scalar_dispatch::operator()(column_view const& col, - scalar const& value, +bool contains_scalar_dispatch::operator()(column_view const& haystack, + scalar const& needle, rmm::cuda_stream_view stream) { - auto dict_col = cudf::dictionary_column_view(col); - // first, find the value in the dictionary's key set - auto index = cudf::dictionary::detail::get_index(dict_col, value, stream); + auto dict_col = cudf::dictionary_column_view(haystack); + // first, find the needle in the dictionary's key set + auto index = cudf::dictionary::detail::get_index(dict_col, needle, stream); // if found, check the index is actually in the indices column return index->is_valid(stream) ? cudf::type_dispatcher(dict_col.indices().type(), contains_scalar_dispatch{}, @@ -251,12 +236,13 @@ bool contains_scalar_dispatch::operator()(column_view const& } // namespace namespace detail { -bool contains(column_view const& col, scalar const& value, rmm::cuda_stream_view stream) +bool contains(column_view const& haystack, scalar const& needle, rmm::cuda_stream_view stream) { - if (col.is_empty()) { return false; } - if (not value.is_valid(stream)) { return col.has_nulls(); } + if (haystack.is_empty()) { return false; } + if (not needle.is_valid(stream)) { return haystack.has_nulls(); } - return cudf::type_dispatcher(col.type(), contains_scalar_dispatch{}, col, value, stream); + return cudf::type_dispatcher( + haystack.type(), contains_scalar_dispatch{}, haystack, needle, stream); } struct multi_contains_dispatch { @@ -267,44 +253,44 @@ struct multi_contains_dispatch { rmm::mr::device_memory_resource* mr) { std::unique_ptr result = make_numeric_column(data_type{type_to_id()}, - haystack.size(), - copy_bitmask(haystack), - haystack.null_count(), + needles.size(), + copy_bitmask(needles), + needles.null_count(), stream, mr); - if (haystack.is_empty()) { return result; } + if (needles.is_empty()) { return result; } mutable_column_view result_view = result.get()->mutable_view(); - if (needles.is_empty()) { + if (haystack.is_empty()) { thrust::fill( rmm::exec_policy(stream), result_view.begin(), result_view.end(), false); return result; } - auto hash_set = cudf::detail::unordered_multiset::create(needles, stream); + auto hash_set = cudf::detail::unordered_multiset::create(haystack, stream); auto device_hash_set = hash_set.to_device(); - auto d_haystack_ptr = column_device_view::create(haystack, stream); - auto d_haystack = *d_haystack_ptr; + auto d_needles_ptr = column_device_view::create(needles, stream); + auto d_needles = *d_needles_ptr; - if (haystack.has_nulls()) { + if (needles.has_nulls()) { thrust::transform(rmm::exec_policy(stream), thrust::make_counting_iterator(0), - thrust::make_counting_iterator(haystack.size()), + thrust::make_counting_iterator(needles.size()), result_view.begin(), - [device_hash_set, d_haystack] __device__(size_t index) { - return d_haystack.is_null_nocheck(index) || - device_hash_set.contains(d_haystack.element(index)); + [device_hash_set, d_needles] __device__(size_t index) { + return d_needles.is_null_nocheck(index) || + device_hash_set.contains(d_needles.element(index)); }); } else { thrust::transform(rmm::exec_policy(stream), thrust::make_counting_iterator(0), - thrust::make_counting_iterator(haystack.size()), + thrust::make_counting_iterator(needles.size()), result_view.begin(), - [device_hash_set, d_haystack] __device__(size_t index) { - return device_hash_set.contains(d_haystack.element(index)); + [device_hash_set, d_needles] __device__(size_t index) { + return device_hash_set.contains(d_needles.element(index)); }); } @@ -336,10 +322,10 @@ std::unique_ptr multi_contains_dispatch::operator()( dictionary_column_view const haystack(haystack_in); dictionary_column_view const needles(needles_in); // first combine keys so both dictionaries have the same set - auto haystack_matched = dictionary::detail::add_keys(haystack, needles.keys(), stream); - auto const haystack_view = dictionary_column_view(haystack_matched->view()); - auto needles_matched = dictionary::detail::set_keys(needles, haystack_view.keys(), stream); + auto needles_matched = dictionary::detail::add_keys(needles, haystack.keys(), stream); auto const needles_view = dictionary_column_view(needles_matched->view()); + auto haystack_matched = dictionary::detail::set_keys(haystack, needles_view.keys(), stream); + auto const haystack_view = dictionary_column_view(haystack_matched->view()); // now just use the indices for the contains column_view const haystack_indices = haystack_view.get_indices_annotated(); @@ -363,56 +349,56 @@ std::unique_ptr contains(column_view const& haystack, haystack.type(), multi_contains_dispatch{}, haystack, needles, stream, mr); } -std::unique_ptr lower_bound(table_view const& t, - table_view const& values, +std::unique_ptr lower_bound(table_view const& haystack, + table_view const& needles, std::vector const& column_order, std::vector const& null_precedence, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - return search_ordered(t, values, true, column_order, null_precedence, stream, mr); + return search_ordered(haystack, needles, true, column_order, null_precedence, stream, mr); } -std::unique_ptr upper_bound(table_view const& t, - table_view const& values, +std::unique_ptr upper_bound(table_view const& haystack, + table_view const& needles, std::vector const& column_order, std::vector const& null_precedence, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - return search_ordered(t, values, false, column_order, null_precedence, stream, mr); + return search_ordered(haystack, needles, false, column_order, null_precedence, stream, mr); } } // namespace detail // external APIs -std::unique_ptr lower_bound(table_view const& t, - table_view const& values, +std::unique_ptr lower_bound(table_view const& haystack, + table_view const& needles, std::vector const& column_order, std::vector const& null_precedence, rmm::mr::device_memory_resource* mr) { CUDF_FUNC_RANGE(); return detail::lower_bound( - t, values, column_order, null_precedence, rmm::cuda_stream_default, mr); + haystack, needles, column_order, null_precedence, rmm::cuda_stream_default, mr); } -std::unique_ptr upper_bound(table_view const& t, - table_view const& values, +std::unique_ptr upper_bound(table_view const& haystack, + table_view const& needles, std::vector const& column_order, std::vector const& null_precedence, rmm::mr::device_memory_resource* mr) { CUDF_FUNC_RANGE(); return detail::upper_bound( - t, values, column_order, null_precedence, rmm::cuda_stream_default, mr); + haystack, needles, column_order, null_precedence, rmm::cuda_stream_default, mr); } -bool contains(column_view const& col, scalar const& value) +bool contains(column_view const& haystack, scalar const& needle) { CUDF_FUNC_RANGE(); - return detail::contains(col, value, rmm::cuda_stream_default); + return detail::contains(haystack, needle, rmm::cuda_stream_default); } std::unique_ptr contains(column_view const& haystack, diff --git a/cpp/tests/search/search_dictionary_test.cpp b/cpp/tests/search/search_dictionary_test.cpp index 6b1caa5ed6f..9eba259ee39 100644 --- a/cpp/tests/search/search_dictionary_test.cpp +++ b/cpp/tests/search/search_dictionary_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -88,7 +88,7 @@ TEST_F(DictionarySearchTest, contains_dictionary) EXPECT_FALSE(cudf::contains(column, string_scalar{"28"})); cudf::test::dictionary_column_wrapper needles({"00", "17", "23", "27"}); - fixed_width_column_wrapper expect{1, 1, 1, 1, 1, 1, 0}; + fixed_width_column_wrapper expect{1, 1, 1, 0}; auto result = cudf::contains(column, needles); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, expect); } @@ -101,7 +101,7 @@ TEST_F(DictionarySearchTest, contains_nullable_dictionary) EXPECT_FALSE(cudf::contains(column, numeric_scalar{28})); cudf::test::dictionary_column_wrapper needles({0, 17, 23, 27}); - fixed_width_column_wrapper expect({1, 0, 1, 1, 1, 1, 0}, {1, 0, 1, 1, 1, 1, 1}); + fixed_width_column_wrapper expect{1, 1, 1, 0}; auto result = cudf::contains(column, needles); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, expect); } diff --git a/cpp/tests/search/search_test.cpp b/cpp/tests/search/search_test.cpp index 0a2533cd5f3..169eaffa41a 100644 --- a/cpp/tests/search/search_test.cpp +++ b/cpp/tests/search/search_test.cpp @@ -1627,7 +1627,7 @@ TEST_F(SearchTest, multi_contains_some) fixed_width_column_wrapper haystack{0, 1, 17, 19, 23, 29, 71}; fixed_width_column_wrapper needles{17, 19, 45, 72}; - fixed_width_column_wrapper expect{0, 0, 1, 1, 0, 0, 0}; + fixed_width_column_wrapper expect{1, 1, 0, 0}; auto result = cudf::contains(haystack, needles); @@ -1641,7 +1641,7 @@ TEST_F(SearchTest, multi_contains_none) fixed_width_column_wrapper haystack{0, 1, 17, 19, 23, 29, 71}; fixed_width_column_wrapper needles{2, 3}; - fixed_width_column_wrapper expect{0, 0, 0, 0, 0, 0, 0}; + fixed_width_column_wrapper expect{0, 0}; auto result = cudf::contains(haystack, needles); @@ -1657,7 +1657,7 @@ TEST_F(SearchTest, multi_contains_some_string) cudf::test::strings_column_wrapper needles(h_needles_strings.begin(), h_needles_strings.end()); - fixed_width_column_wrapper expect{0, 0, 1, 1, 0, 0, 0}; + fixed_width_column_wrapper expect{1, 1, 0, 0}; auto result = cudf::contains(haystack, needles); @@ -1673,7 +1673,7 @@ TEST_F(SearchTest, multi_contains_none_string) cudf::test::strings_column_wrapper needles(h_needles_strings.begin(), h_needles_strings.end()); - fixed_width_column_wrapper expect{0, 0, 0, 0, 0, 0, 0}; + fixed_width_column_wrapper expect{0, 0}; auto result = cudf::contains(haystack, needles); @@ -1688,7 +1688,7 @@ TEST_F(SearchTest, multi_contains_some_with_nulls) {1, 1, 0, 1, 1, 1, 1}}; fixed_width_column_wrapper needles{{17, 19, 23, 72}, {1, 0, 1, 1}}; - fixed_width_column_wrapper expect{{0, 0, 0, 0, 1, 0, 0}, {1, 1, 0, 1, 1, 1, 1}}; + fixed_width_column_wrapper expect{{0, 0, 1, 0}, {1, 0, 1, 1}}; auto result = cudf::contains(haystack, needles); @@ -1703,7 +1703,7 @@ TEST_F(SearchTest, multi_contains_none_with_nulls) {1, 1, 0, 1, 1, 1, 1}}; fixed_width_column_wrapper needles{{17, 19, 24, 72}, {1, 0, 1, 1}}; - fixed_width_column_wrapper expect{{0, 0, 0, 0, 0, 0, 0}, {1, 1, 0, 1, 1, 1, 1}}; + fixed_width_column_wrapper expect{{0, 0, 0, 0}, {1, 0, 1, 1}}; auto result = cudf::contains(haystack, needles); @@ -1715,7 +1715,7 @@ TEST_F(SearchTest, multi_contains_some_string_with_nulls) std::vector h_haystack_strings{"0", "1", nullptr, "19", "23", "29", "71"}; std::vector h_needles_strings{"17", "23", nullptr, "72"}; - fixed_width_column_wrapper expect{{0, 0, 0, 0, 1, 0, 0}, {1, 1, 0, 1, 1, 1, 1}}; + fixed_width_column_wrapper expect{{0, 1, 0, 0}, {1, 1, 0, 1}}; cudf::test::strings_column_wrapper haystack( h_haystack_strings.begin(), @@ -1739,7 +1739,7 @@ TEST_F(SearchTest, multi_contains_none_string_with_nulls) std::vector h_haystack_strings{"0", "1", nullptr, "19", "23", "29", "71"}; std::vector h_needles_strings{"2", nullptr}; - fixed_width_column_wrapper expect{{0, 0, 0, 0, 0, 0, 0}, {1, 1, 0, 1, 1, 1, 1}}; + fixed_width_column_wrapper expect{{0, 0}, {1, 0}}; cudf::test::strings_column_wrapper haystack( h_haystack_strings.begin(), @@ -1765,7 +1765,7 @@ TEST_F(SearchTest, multi_contains_empty_column) fixed_width_column_wrapper haystack{}; fixed_width_column_wrapper needles{2, 3}; - fixed_width_column_wrapper expect{}; + fixed_width_column_wrapper expect{0, 0}; auto result = cudf::contains(haystack, needles); @@ -1781,7 +1781,7 @@ TEST_F(SearchTest, multi_contains_empty_column_string) cudf::test::strings_column_wrapper needles(h_needles_strings.begin(), h_needles_strings.end()); - fixed_width_column_wrapper expect{}; + fixed_width_column_wrapper expect{0, 0, 0, 0}; auto result = cudf::contains(haystack, needles); @@ -1795,7 +1795,7 @@ TEST_F(SearchTest, multi_contains_empty_input_set) fixed_width_column_wrapper haystack{0, 1, 17, 19, 23, 29, 71}; fixed_width_column_wrapper needles{}; - fixed_width_column_wrapper expect{0, 0, 0, 0, 0, 0, 0}; + fixed_width_column_wrapper expect{}; auto result = cudf::contains(haystack, needles); @@ -1811,7 +1811,7 @@ TEST_F(SearchTest, multi_contains_empty_input_set_string) cudf::test::strings_column_wrapper needles(h_needles_strings.begin(), h_needles_strings.end()); - fixed_width_column_wrapper expect{0, 0, 0, 0, 0, 0, 0}; + fixed_width_column_wrapper expect{}; auto result = cudf::contains(haystack, needles); diff --git a/java/src/main/java/ai/rapids/cudf/ColumnView.java b/java/src/main/java/ai/rapids/cudf/ColumnView.java index e871da18966..9f07b130a83 100644 --- a/java/src/main/java/ai/rapids/cudf/ColumnView.java +++ b/java/src/main/java/ai/rapids/cudf/ColumnView.java @@ -1769,22 +1769,23 @@ public boolean contains(Scalar needle) { } /** - * Returns a new ColumnVector of {@link DType#BOOL8} elements containing true if the corresponding - * entry in haystack is contained in needles and false if it is not. The caller will be responsible - * for the lifecycle of the new vector. + * Returns a new column of {@link DType#BOOL8} elements having the same size as this column, + * each row value is true if the corresponding entry in this column is contained in the + * given searchSpace column and false if it is not. + * The caller will be responsible for the lifecycle of the new vector. * * example: * - * haystack = { 10, 20, 30, 40, 50 } - * needles = { 20, 40, 60, 80 } + * col = { 10, 20, 30, 40, 50 } + * searchSpace = { 20, 40, 60, 80 } * * result = { false, true, false, true, false } * - * @param needles + * @param searchSpace * @return A new ColumnVector of type {@link DType#BOOL8} */ - public final ColumnVector contains(ColumnView needles) { - return new ColumnVector(containsVector(getNativeView(), needles.getNativeView())); + public final ColumnVector contains(ColumnView searchSpace) { + return new ColumnVector(containsVector(getNativeView(), searchSpace.getNativeView())); } /** @@ -4080,7 +4081,7 @@ private static native long segmentedGather(long sourceColumnHandle, long gatherM private static native boolean containsScalar(long columnViewHaystack, long scalarHandle) throws CudfException; - private static native long containsVector(long columnViewHaystack, long columnViewNeedles) throws CudfException; + private static native long containsVector(long valuesHandle, long searchSpaceHandle) throws CudfException; private static native long transform(long viewHandle, String udf, boolean isPtx); diff --git a/java/src/main/native/src/ColumnViewJni.cpp b/java/src/main/native/src/ColumnViewJni.cpp index e074180c312..b33769bdc1b 100644 --- a/java/src/main/native/src/ColumnViewJni.cpp +++ b/java/src/main/native/src/ColumnViewJni.cpp @@ -1166,15 +1166,18 @@ JNIEXPORT jboolean JNICALL Java_ai_rapids_cudf_ColumnView_containsScalar(JNIEnv } JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ColumnView_containsVector(JNIEnv *env, jobject j_object, - jlong j_haystack_handle, - jlong j_needle_handle) { - JNI_NULL_CHECK(env, j_haystack_handle, "haystack vector is null", false); - JNI_NULL_CHECK(env, j_needle_handle, "needle vector is null", false); + jlong j_values_handle, + jlong j_search_space_handle) { + JNI_NULL_CHECK(env, j_values_handle, "values vector is null", false); + JNI_NULL_CHECK(env, j_search_space_handle, "search_space vector is null", false); try { cudf::jni::auto_set_device(env); - cudf::column_view *haystack = reinterpret_cast(j_haystack_handle); - cudf::column_view *needle = reinterpret_cast(j_needle_handle); - return release_as_jlong(cudf::contains(*haystack, *needle)); + auto const search_space_ptr = + reinterpret_cast(j_search_space_handle); + auto const values_ptr = reinterpret_cast(j_values_handle); + + // The C++ API `cudf::contains` requires that the search space is the first parameter. + return release_as_jlong(cudf::contains(*search_space_ptr, *values_ptr)); } CATCH_STD(env, 0); } diff --git a/python/cudf/cudf/_lib/cpp/search.pxd b/python/cudf/cudf/_lib/cpp/search.pxd index 4df73881ea5..8baef0aa1b9 100644 --- a/python/cudf/cudf/_lib/cpp/search.pxd +++ b/python/cudf/cudf/_lib/cpp/search.pxd @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. from libcpp.memory cimport unique_ptr from libcpp.vector cimport vector @@ -12,15 +12,15 @@ from cudf._lib.cpp.table.table_view cimport table_view cdef extern from "cudf/search.hpp" namespace "cudf" nogil: cdef unique_ptr[column] lower_bound( - table_view t, - table_view values, + table_view haystack, + table_view needles, vector[libcudf_types.order] column_order, vector[libcudf_types.null_order] null_precedence, ) except + cdef unique_ptr[column] upper_bound( - table_view t, - table_view values, + table_view haystack, + table_view needles, vector[libcudf_types.order] column_order, vector[libcudf_types.null_order] null_precedence, ) except + diff --git a/python/cudf/cudf/core/column/string.py b/python/cudf/cudf/core/column/string.py index 70097f15372..09a4754f519 100644 --- a/python/cudf/cudf/core/column/string.py +++ b/python/cudf/cudf/core/column/string.py @@ -5407,7 +5407,7 @@ def fillna( def _find_first_and_last(self, value: ScalarLike) -> Tuple[int, int]: found_indices = libcudf.search.contains( - self, column.as_column([value], dtype=self.dtype) + column.as_column([value], dtype=self.dtype), self ) found_indices = libcudf.unary.cast(found_indices, dtype=np.int32) first = column.as_column(found_indices).find_first_value(np.int32(1)) From dc0c3cd0cdca4a5cf2d47a79099fb846de4d604c Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Tue, 10 May 2022 16:58:42 -0500 Subject: [PATCH 4/6] Use `ThreadedMotoServer` instead of `subprocess` in spinning up `s3` server (#10822) This is a follow-up PR to address review comments from here: https://github.com/rapidsai/cudf/pull/10769#pullrequestreview-968047334 This PR: - [x] Uses `ThreadedMotoServer` instead of using `subprocess.open` to create a new server, this way it is guaranteed to close the server upon exit. - [x] Add's IP address fixture instead of having it hard-coded at multiple places. Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Bradley Dice (https://github.com/bdice) - Charles Blackmon-Luca (https://github.com/charlesbluca) URL: https://github.com/rapidsai/cudf/pull/10822 --- python/cudf/cudf/tests/test_s3.py | 40 +++++++------------ .../dask_cudf/dask_cudf/io/tests/test_s3.py | 38 ++++++------------ 2 files changed, 27 insertions(+), 51 deletions(-) diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 0966bee93fd..b754429555d 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -1,10 +1,7 @@ # Copyright (c) 2020-2022, NVIDIA CORPORATION. import os -import shlex import socket -import subprocess -import time from contextlib import contextmanager from io import BytesIO @@ -21,9 +18,15 @@ moto = pytest.importorskip("moto", minversion="3.1.6") boto3 = pytest.importorskip("boto3") -requests = pytest.importorskip("requests") s3fs = pytest.importorskip("s3fs") +ThreadedMotoServer = pytest.importorskip("moto.server").ThreadedMotoServer + + +@pytest.fixture(scope="session") +def endpoint_ip(): + return "127.0.0.1" + @pytest.fixture(scope="session") def endpoint_port(): @@ -51,7 +54,7 @@ def ensure_safe_environment_variables(): @pytest.fixture(scope="session") -def s3_base(endpoint_port): +def s3_base(endpoint_ip, endpoint_port): """ Fixture to set up moto server in separate process """ @@ -65,35 +68,20 @@ def s3_base(endpoint_port): # Launching moto in server mode, i.e., as a separate process # with an S3 endpoint on localhost - endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" - - proc = subprocess.Popen( - shlex.split(f"moto_server s3 -p {endpoint_port}"), - ) + endpoint_uri = f"http://{endpoint_ip}:{endpoint_port}/" - timeout = 5 - while timeout > 0: - try: - # OK to go once server is accepting connections - r = requests.get(endpoint_uri) - if r.ok: - break - except Exception: - pass - timeout -= 0.1 - time.sleep(0.1) + server = ThreadedMotoServer(ip_address=endpoint_ip, port=endpoint_port) + server.start() yield endpoint_uri - - proc.terminate() - proc.wait() + server.stop() @pytest.fixture() -def s3so(endpoint_port): +def s3so(endpoint_ip, endpoint_port): """ Returns s3 storage options to pass to fsspec """ - endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" + endpoint_uri = f"http://{endpoint_ip}:{endpoint_port}/" return {"client_kwargs": {"endpoint_url": endpoint_uri}} diff --git a/python/dask_cudf/dask_cudf/io/tests/test_s3.py b/python/dask_cudf/dask_cudf/io/tests/test_s3.py index 9283380296c..5be0cf7c887 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_s3.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_s3.py @@ -1,10 +1,7 @@ # Copyright (c) 2020-2022, NVIDIA CORPORATION. import os -import shlex import socket -import subprocess -import time from contextlib import contextmanager from io import BytesIO @@ -16,8 +13,13 @@ moto = pytest.importorskip("moto", minversion="3.1.6") boto3 = pytest.importorskip("boto3") -requests = pytest.importorskip("requests") s3fs = pytest.importorskip("s3fs") +ThreadedMotoServer = pytest.importorskip("moto.server").ThreadedMotoServer + + +@pytest.fixture(scope="session") +def endpoint_ip(): + return "127.0.0.1" @pytest.fixture(scope="session") @@ -46,7 +48,7 @@ def ensure_safe_environment_variables(): @pytest.fixture(scope="session") -def s3_base(endpoint_port): +def s3_base(endpoint_ip, endpoint_port): """ Fixture to set up moto server in separate process """ @@ -59,35 +61,21 @@ def s3_base(endpoint_port): # Launching moto in server mode, i.e., as a separate process # with an S3 endpoint on localhost - endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" - proc = subprocess.Popen( - shlex.split(f"moto_server s3 -p {endpoint_port}"), - ) + endpoint_uri = f"http://{endpoint_ip}:{endpoint_port}/" - timeout = 5 - while timeout > 0: - try: - # OK to go once server is accepting connections - r = requests.get(endpoint_uri) - if r.ok: - break - except Exception: - pass - timeout -= 0.1 - time.sleep(0.1) + server = ThreadedMotoServer(ip_address=endpoint_ip, port=endpoint_port) + server.start() yield endpoint_uri - - proc.terminate() - proc.wait() + server.stop() @pytest.fixture() -def s3so(endpoint_port): +def s3so(endpoint_ip, endpoint_port): """ Returns s3 storage options to pass to fsspec """ - endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" + endpoint_uri = f"http://{endpoint_ip}:{endpoint_port}/" return {"client_kwargs": {"endpoint_url": endpoint_uri}} From 366206d4a04e77bc3fbc9b41948ddb816d4f38e3 Mon Sep 17 00:00:00 2001 From: brandon-b-miller <53796099+brandon-b-miller@users.noreply.github.com> Date: Tue, 10 May 2022 17:05:18 -0500 Subject: [PATCH 5/6] Import `NA` from `missing` rather than using `cudf.NA` everywhere (#10821) This PR changes cuDF so `NA` isn't used around the codebase from the top level `cudf` namespace and rather is imported directly from `missing`. This is part of https://github.com/rapidsai/cudf/issues/10820 and comes as a follow up to https://github.com/rapidsai/cudf/pull/10791#discussion_r867206392 Authors: - https://github.com/brandon-b-miller Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/10821 --- python/cudf/cudf/_lib/scalar.pyx | 23 ++++++++++--------- python/cudf/cudf/core/_internals/where.py | 5 ++-- python/cudf/cudf/core/column/column.py | 3 ++- python/cudf/cudf/core/column/lists.py | 5 ++-- .../cudf/cudf/core/column/numerical_base.py | 3 ++- python/cudf/cudf/core/column/struct.py | 3 ++- python/cudf/cudf/core/dataframe.py | 11 ++++----- python/cudf/cudf/testing/testing.py | 3 ++- python/cudf/cudf/utils/dtypes.py | 3 ++- 9 files changed, 32 insertions(+), 27 deletions(-) diff --git a/python/cudf/cudf/_lib/scalar.pyx b/python/cudf/cudf/_lib/scalar.pyx index 71ac022ba2d..6309720706b 100644 --- a/python/cudf/cudf/_lib/scalar.pyx +++ b/python/cudf/cudf/_lib/scalar.pyx @@ -27,6 +27,7 @@ from cudf._lib.types import ( duration_unit_map, ) from cudf.core.dtypes import ListDtype, StructDtype +from cudf.core.missing import NA from cudf._lib.column cimport Column from cudf._lib.cpp.column.column_view cimport column_view @@ -170,7 +171,7 @@ cdef class DeviceScalar: return self.get_raw_ptr()[0].is_valid() def __repr__(self): - if self.value is cudf.NA: + if self.value is NA: return ( f"{self.__class__.__name__}" f"({self.value}, {repr(self.dtype)})" @@ -356,7 +357,7 @@ cdef _set_struct_from_pydict(unique_ptr[scalar]& s, else: pyarrow_table = pa.Table.from_arrays( [ - pa.array([cudf.NA], from_pandas=True, type=f.type) + pa.array([NA], from_pandas=True, type=f.type) for f in arrow_schema ], names=columns @@ -371,7 +372,7 @@ cdef _set_struct_from_pydict(unique_ptr[scalar]& s, cdef _get_py_dict_from_struct(unique_ptr[scalar]& s): if not s.get()[0].is_valid(): - return cudf.NA + return NA cdef table_view struct_table_view = (s.get()).view() column_names = [str(i) for i in range(struct_table_view.num_columns())] @@ -386,7 +387,7 @@ cdef _set_list_from_pylist(unique_ptr[scalar]& s, object dtype, bool valid=True): - value = value if valid else [cudf.NA] + value = value if valid else [NA] cdef Column col if isinstance(dtype.element_type, ListDtype): pa_type = dtype.element_type.to_arrow() @@ -404,7 +405,7 @@ cdef _set_list_from_pylist(unique_ptr[scalar]& s, cdef _get_py_list_from_list(unique_ptr[scalar]& s): if not s.get()[0].is_valid(): - return cudf.NA + return NA cdef column_view list_col_view = (s.get()).view() cdef Column list_col = Column.from_column_view(list_col_view, None) @@ -416,14 +417,14 @@ cdef _get_py_list_from_list(unique_ptr[scalar]& s): cdef _get_py_string_from_string(unique_ptr[scalar]& s): if not s.get()[0].is_valid(): - return cudf.NA + return NA return (s.get())[0].to_string().decode() cdef _get_np_scalar_from_numeric(unique_ptr[scalar]& s): cdef scalar* s_ptr = s.get() if not s_ptr[0].is_valid(): - return cudf.NA + return NA cdef libcudf_types.data_type cdtype = s_ptr[0].type() @@ -456,7 +457,7 @@ cdef _get_np_scalar_from_numeric(unique_ptr[scalar]& s): cdef _get_py_decimal_from_fixed_point(unique_ptr[scalar]& s): cdef scalar* s_ptr = s.get() if not s_ptr[0].is_valid(): - return cudf.NA + return NA cdef libcudf_types.data_type cdtype = s_ptr[0].type() @@ -480,7 +481,7 @@ cdef _get_np_scalar_from_timestamp64(unique_ptr[scalar]& s): cdef scalar* s_ptr = s.get() if not s_ptr[0].is_valid(): - return cudf.NA + return NA cdef libcudf_types.data_type cdtype = s_ptr[0].type() @@ -571,7 +572,7 @@ def as_device_scalar(val, dtype=None): def _is_null_host_scalar(slr): - if slr is None or slr is cudf.NA: + if slr is None or slr is NA: return True elif isinstance(slr, (np.datetime64, np.timedelta64)) and np.isnat(slr): return True @@ -603,5 +604,5 @@ def _nested_na_replace(input_list): if isinstance(value, list): _nested_na_replace(value) elif value is None: - input_list[idx] = cudf.NA + input_list[idx] = NA return input_list diff --git a/python/cudf/cudf/core/_internals/where.py b/python/cudf/cudf/core/_internals/where.py index 59e7d629092..bc01752a2b4 100644 --- a/python/cudf/cudf/core/_internals/where.py +++ b/python/cudf/cudf/core/_internals/where.py @@ -12,6 +12,7 @@ from cudf.core.dataframe import DataFrame from cudf.core.frame import Frame from cudf.core.index import Index +from cudf.core.missing import NA from cudf.core.series import Series from cudf.core.single_column_frame import SingleColumnFrame @@ -28,9 +29,7 @@ def _normalize_scalars(col: ColumnBase, other: ScalarLike) -> ScalarLike: f"{type(other).__name__} to {col.dtype.name}" ) - return cudf.Scalar( - other, dtype=col.dtype if other in {None, cudf.NA} else None - ) + return cudf.Scalar(other, dtype=col.dtype if other in {None, NA} else None) def _check_and_cast_columns_with_other( diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index e1d91e6d0c0..47a2e3489e8 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -68,6 +68,7 @@ ListDtype, StructDtype, ) +from cudf.core.missing import NA from cudf.core.mixins import BinaryOperand, Reducible from cudf.utils.dtypes import ( cudf_dtype_from_pa_type, @@ -499,7 +500,7 @@ def __setitem__(self, key: Any, value: Any): self._mimic_inplace(out, inplace=True) def _wrap_binop_normalization(self, other): - if other is cudf.NA or other is None: + if other is NA or other is None: return cudf.Scalar(other, dtype=self.dtype) if isinstance(other, np.ndarray) and other.ndim == 0: other = other.item() diff --git a/python/cudf/cudf/core/column/lists.py b/python/cudf/cudf/core/column/lists.py index 30e418f0825..e8a5638f07a 100644 --- a/python/cudf/cudf/core/column/lists.py +++ b/python/cudf/cudf/core/column/lists.py @@ -30,6 +30,7 @@ from cudf.core.column import ColumnBase, as_column, column from cudf.core.column.methods import ColumnMethods, ParentType from cudf.core.dtypes import ListDtype +from cudf.core.missing import NA class ListColumn(ColumnBase): @@ -91,7 +92,7 @@ def __setitem__(self, key, value): if isinstance(value, cudf.Scalar): if value.dtype != self.dtype: raise TypeError("list nesting level mismatch") - elif value is cudf.NA: + elif value is NA: value = cudf.Scalar(value, dtype=self.dtype) else: raise ValueError(f"Can not set {value} into ListColumn") @@ -354,7 +355,7 @@ def get( index = as_column(index) out = extract_element_column(self._column, as_column(index)) - if not (default is None or default is cudf.NA): + if not (default is None or default is NA): # determine rows for which `index` is out-of-bounds lengths = count_elements(self._column) out_of_bounds_mask = (np.negative(index) > lengths) | ( diff --git a/python/cudf/cudf/core/column/numerical_base.py b/python/cudf/cudf/core/column/numerical_base.py index 659bb58d790..bb7711a3ead 100644 --- a/python/cudf/cudf/core/column/numerical_base.py +++ b/python/cudf/cudf/core/column/numerical_base.py @@ -11,6 +11,7 @@ from cudf import _lib as libcudf from cudf._typing import ScalarLike from cudf.core.column import ColumnBase +from cudf.core.missing import NA from cudf.core.mixins import Scannable @@ -116,7 +117,7 @@ def quantile( scalar_result = result.element_indexing(0) return ( cudf.utils.dtypes._get_nan_for_dtype(self.dtype) - if scalar_result is cudf.NA + if scalar_result is NA else scalar_result ) return result diff --git a/python/cudf/cudf/core/column/struct.py b/python/cudf/cudf/core/column/struct.py index ed5e1c9450d..fa834ae8a5a 100644 --- a/python/cudf/cudf/core/column/struct.py +++ b/python/cudf/cudf/core/column/struct.py @@ -10,6 +10,7 @@ from cudf.core.column import ColumnBase, build_struct_column from cudf.core.column.methods import ColumnMethods from cudf.core.dtypes import StructDtype +from cudf.core.missing import NA class StructColumn(ColumnBase): @@ -102,7 +103,7 @@ def __setitem__(self, key, value): if isinstance(value, dict): # filling in fields not in dict for field in self.dtype.fields: - value[field] = value.get(field, cudf.NA) + value[field] = value.get(field, NA) value = cudf.Scalar(value, self.dtype) super().__setitem__(key, value) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index a3e2f40b28e..0c3dc82719e 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -76,6 +76,7 @@ _indices_from_labels, doc_reset_index_template, ) +from cudf.core.missing import NA from cudf.core.multiindex import MultiIndex from cudf.core.resample import DataFrameResampler from cudf.core.series import Series @@ -364,9 +365,7 @@ def _setitem_tuple_arg(self, key, value): scatter_map = _indices_from_labels(self._frame, key[0]) for col in columns_df._column_names: columns_df[col][scatter_map] = ( - value._data[col] - if col in value_column_names - else cudf.NA + value._data[col] if col in value_column_names else NA ) else: @@ -479,7 +478,7 @@ def _setitem_tuple_arg(self, key, value): value_column_names = set(value._column_names) for col in columns_df._column_names: columns_df[col][key[0]] = ( - value._data[col] if col in value_column_names else cudf.NA + value._data[col] if col in value_column_names else NA ) else: @@ -3867,8 +3866,8 @@ def applymap( # bytecode to generate the equivalent PTX # as a null-ignoring version of the function def _func(x): # pragma: no cover - if x is cudf.NA: - return cudf.NA + if x is NA: + return NA else: return devfunc(x) diff --git a/python/cudf/cudf/testing/testing.py b/python/cudf/cudf/testing/testing.py index b134d2b26e9..070e4649c7b 100644 --- a/python/cudf/cudf/testing/testing.py +++ b/python/cudf/cudf/testing/testing.py @@ -20,6 +20,7 @@ is_struct_dtype, ) from cudf.core._compat import PANDAS_GE_110 +from cudf.core.missing import NA def dtype_can_compare_equal_to_other(dtype): @@ -290,7 +291,7 @@ def assert_column_equal( def null_safe_scalar_equals(left, right): - if left in {cudf.NA, np.nan} or right in {cudf.NA, np.nan}: + if left in {NA, np.nan} or right in {NA, np.nan}: return left is right return left == right diff --git a/python/cudf/cudf/utils/dtypes.py b/python/cudf/cudf/utils/dtypes.py index 35c6fdc73f8..c2d9a57b72f 100644 --- a/python/cudf/cudf/utils/dtypes.py +++ b/python/cudf/cudf/utils/dtypes.py @@ -12,6 +12,7 @@ import cudf from cudf.core._compat import PANDAS_GE_120 +from cudf.core.missing import NA _NA_REP = "" @@ -591,7 +592,7 @@ def _can_cast(from_dtype, to_dtype): `np.can_cast` but with some special handling around cudf specific dtypes. """ - if from_dtype in {None, cudf.NA}: + if from_dtype in {None, NA}: return True if isinstance(from_dtype, type): from_dtype = cudf.dtype(from_dtype) From 2aaa863ee78f4cdc8d7a0180abe82bdad4cd7923 Mon Sep 17 00:00:00 2001 From: Peixin Date: Wed, 11 May 2022 07:36:34 +0800 Subject: [PATCH 6/6] Add cudf JNI docker build github action (#10806) Signed-off-by: Peixin Li related to https://github.com/NVIDIA/spark-rapids-jni/issues/203 I don't know if any restriction of using github actions in rapids org, so this PR is just a prototype and waiting for feedback :) Example in my forked repo, 1. Manual trigger the build ![image](https://user-images.githubusercontent.com/8086184/167101308-b3e4376e-4c42-4603-aca1-2b0f40ace16f.png) 2. example logs https://github.com/pxLi/cudf/runs/6319840419?check_suite_focus=true 3. pushed images https://hub.docker.com/r/pxli/cudf-jni-build/tags ![image](https://user-images.githubusercontent.com/8086184/167102403-a5b49a99-d8de-46d6-b5df-a1ea50254448.png) Some open questions: 1. Trigger strategy? Manual, crontab, merge trigger, or others? 2. which org in docker hub should we push the image to? gpuci? 3. which account we should use to push? 4. do we want to keep the commit tags or we just overwrite the branch-XY tag only? mostly about space :) Thanks! Authors: - Peixin (https://github.com/pxLi) Approvers: - Jason Lowe (https://github.com/jlowe) - AJ Schmidt (https://github.com/ajschmidt8) URL: https://github.com/rapidsai/cudf/pull/10806 --- .github/workflows/jni-docker-build.yml | 53 ++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 .github/workflows/jni-docker-build.yml diff --git a/.github/workflows/jni-docker-build.yml b/.github/workflows/jni-docker-build.yml new file mode 100644 index 00000000000..0bdc409d0ab --- /dev/null +++ b/.github/workflows/jni-docker-build.yml @@ -0,0 +1,53 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: JNI Docker Build + +on: + workflow_dispatch: # manual trigger only + +concurrency: + group: jni-docker-build-${{ github.ref }} + cancel-in-progress: true + +jobs: + docker-build: + if: github.repository == 'rapidsai/cudf' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v2 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + - name: Login to DockerHub + uses: docker/login-action@v2 + with: + username: ${{ secrets.GPUCIBOT_DOCKERHUB_USER }} + password: ${{ secrets.GPUCIBOT_DOCKERHUB_TOKEN }} + + - name: Set ENVs + run: | + echo "IMAGE_NAME=rapidsai/cudf-jni-build" >> $GITHUB_ENV + echo "IMAGE_REF=${GITHUB_REF_NAME}" >> $GITHUB_ENV + + - name: Build and Push + uses: docker/build-push-action@v3 + with: + push: true + file: java/ci/Dockerfile.centos7 + tags: "${{ env.IMAGE_NAME }}:${{ env.IMAGE_REF }}"