From d78d565b15bd9a2e3200176af4656ee2098b209b Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Tue, 21 May 2024 07:57:11 -1000 Subject: [PATCH 1/5] Avoid index-to-column conversion in some DataFrame ops (#15763) xref https://github.com/rapidsai/cudf/pull/15494 * For `Index.str`, check the `dtype` instead of the underlying column type (which would materialize RangeIndex) * For `set_index`, don't immediately convert passed objects to column until necessary * For `_make_operands_and_index_for_binop`, don't create pandas object more than once Authors: - Matthew Roeschke (https://github.com/mroeschke) - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/15763 --- python/cudf/cudf/core/dataframe.py | 109 +++++++++++------------------ python/cudf/cudf/core/index.py | 3 +- 2 files changed, 43 insertions(+), 69 deletions(-) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 88b1ae2ea22..0b7c40ff516 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -2047,29 +2047,24 @@ def _make_operands_and_index_for_binop( equal_columns = True elif isinstance(other, Series): if ( - not can_reindex - and fn in cudf.utils.utils._EQUALITY_OPS - and ( - not self._data.to_pandas_index().equals( - other.index.to_pandas() - ) + not (self_pd_columns := self._data.to_pandas_index()).equals( + other_pd_index := other.index.to_pandas() ) + and not can_reindex + and fn in cudf.utils.utils._EQUALITY_OPS ): raise ValueError( "Can only compare DataFrame & Series objects " "whose columns & index are same respectively, " "please reindex." ) - rhs = dict(zip(other.index.to_pandas(), other.values_host)) + rhs = dict(zip(other_pd_index, other.values_host)) # For keys in right but not left, perform binops between NaN (not # NULL!) and the right value (result is NaN). left_default = as_column(np.nan, length=len(self)) - equal_columns = other.index.to_pandas().equals( - self._data.to_pandas_index() - ) + equal_columns = other_pd_index.equals(self_pd_columns) can_use_self_column_name = ( - equal_columns - or list(other._index._data.names) == self._data._level_names + equal_columns or other_pd_index.names == self_pd_columns.names ) elif isinstance(other, DataFrame): if ( @@ -2952,82 +2947,60 @@ def set_index( if not isinstance(keys, list): keys = [keys] + if len(keys) == 0: + raise ValueError("No valid columns to be added to index.") + if append: + keys = [self.index] + keys # Preliminary type check - col_not_found = [] - columns_to_add = [] + labels_not_found = [] + data_to_add = [] names = [] to_drop = [] for col in keys: - # Is column label + # label-like if is_scalar(col) or isinstance(col, tuple): if col in self._column_names: - columns_to_add.append(self[col]) + data_to_add.append(self[col]) names.append(col) if drop: to_drop.append(col) else: - col_not_found.append(col) + labels_not_found.append(col) + # index-like + elif isinstance(col, (MultiIndex, pd.MultiIndex)): + if isinstance(col, pd.MultiIndex): + col = MultiIndex.from_pandas(col) + data_to_add.extend(col._data.columns) + names.extend(col.names) + elif isinstance( + col, (cudf.Series, cudf.Index, pd.Series, pd.Index) + ): + data_to_add.append(col) + names.append(col.name) else: - # Try coerce into column - if not is_column_like(col): - try: - col = as_column(col) - except TypeError: - msg = f"{col} cannot be converted to column-like." - raise TypeError(msg) - if isinstance(col, (MultiIndex, pd.MultiIndex)): - col = ( - cudf.from_pandas(col) - if isinstance(col, pd.MultiIndex) - else col - ) - cols = [col._data[x] for x in col._data] - columns_to_add.extend(cols) - names.extend(col.names) - else: - if isinstance(col, (pd.RangeIndex, cudf.RangeIndex)): - # Corner case: RangeIndex does not need to instantiate - columns_to_add.append(col) - else: - # For pandas obj, convert to gpu obj - columns_to_add.append(as_column(col)) - if isinstance( - col, (cudf.Series, cudf.Index, pd.Series, pd.Index) - ): - names.append(col.name) - else: - names.append(None) - - if col_not_found: - raise KeyError(f"None of {col_not_found} are in the columns") + try: + col = as_column(col) + except TypeError as err: + msg = f"{col} cannot be converted to column-like." + raise TypeError(msg) from err + data_to_add.append(col) + names.append(None) - if append: - idx_cols = [self.index._data[x] for x in self.index._data] - if isinstance(self.index, MultiIndex): - idx_names = self.index.names - else: - idx_names = [self.index.name] - columns_to_add = idx_cols + columns_to_add - names = idx_names + names + if labels_not_found: + raise KeyError(f"None of {labels_not_found} are in the columns") - if len(columns_to_add) == 0: - raise ValueError("No valid columns to be added to index.") - elif ( - len(columns_to_add) == 1 + if ( + len(data_to_add) == 1 and len(keys) == 1 and not isinstance(keys[0], (cudf.MultiIndex, pd.MultiIndex)) ): - idx = cudf.Index(columns_to_add[0], name=names[0]) + # Don't turn single level MultiIndex into an Index + idx = cudf.Index(data_to_add[0], name=names[0]) else: - idx = MultiIndex._from_data( - {i: col for i, col in enumerate(columns_to_add)} - ) + idx = MultiIndex._from_data(dict(enumerate(data_to_add))) idx.names = names - if not isinstance(idx, BaseIndex): - raise ValueError("Parameter index should be type `Index`.") - df = self if inplace else self.copy(deep=True) if verify_integrity and not idx.is_unique: diff --git a/python/cudf/cudf/core/index.py b/python/cudf/cudf/core/index.py index 209e582e5d6..49bfb150f60 100644 --- a/python/cudf/cudf/core/index.py +++ b/python/cudf/cudf/core/index.py @@ -38,6 +38,7 @@ is_integer, is_list_like, is_scalar, + is_string_dtype, ) from cudf.core._base_index import BaseIndex, _return_get_indexer_result from cudf.core._compat import PANDAS_LT_300 @@ -1623,7 +1624,7 @@ def _indices_of(self, value): @property @_cudf_nvtx_annotate def str(self): - if isinstance(self._values, cudf.core.column.StringColumn): + if is_string_dtype(self.dtype): return StringMethods(parent=self) else: raise AttributeError( From 2c70971ecc66960dcf4bfb2fc6618c7f9f60980f Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Tue, 21 May 2024 16:36:01 -0500 Subject: [PATCH 2/5] Upgrade `arrow` to 16.1 (#15787) This PR upgrades arrow to 16.1 Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Bradley Dice (https://github.com/bdice) - Ray Douglass (https://github.com/raydouglass) - Matthew Roeschke (https://github.com/mroeschke) URL: https://github.com/rapidsai/cudf/pull/15787 --- .../all_cuda-118_arch-x86_64.yaml | 10 ++++----- .../all_cuda-122_arch-x86_64.yaml | 10 ++++----- conda/recipes/cudf/meta.yaml | 4 ++-- conda/recipes/libcudf/conda_build_config.yaml | 2 +- conda/recipes/libcudf/meta.yaml | 9 -------- cpp/cmake/thirdparty/get_arrow.cmake | 2 +- dependencies.yaml | 22 +++++++++---------- python/cudf/cudf/tests/test_orc.py | 3 +++ python/cudf/pyproject.toml | 4 ++-- python/cudf_kafka/pyproject.toml | 2 +- 10 files changed, 31 insertions(+), 37 deletions(-) diff --git a/conda/environments/all_cuda-118_arch-x86_64.yaml b/conda/environments/all_cuda-118_arch-x86_64.yaml index 48699b81eed..804b09bab59 100644 --- a/conda/environments/all_cuda-118_arch-x86_64.yaml +++ b/conda/environments/all_cuda-118_arch-x86_64.yaml @@ -36,15 +36,15 @@ dependencies: - hypothesis - identify>=2.5.20 - ipython -- libarrow-acero==16.0.0.* -- libarrow-dataset==16.0.0.* -- libarrow==16.0.0.* +- libarrow-acero==16.1.0.* +- libarrow-dataset==16.1.0.* +- libarrow==16.1.0.* - libcufile-dev=1.4.0.31 - libcufile=1.4.0.31 - libcurand-dev=10.3.0.86 - libcurand=10.3.0.86 - libkvikio==24.6.* -- libparquet==16.0.0.* +- libparquet==16.1.0.* - librdkafka>=1.9.0,<1.10.0a0 - librmm==24.6.* - make @@ -66,7 +66,7 @@ dependencies: - pip - pre-commit - ptxcompiler -- pyarrow==16.0.0.* +- pyarrow==16.1.0.* - pydata-sphinx-theme!=0.14.2 - pytest-benchmark - pytest-cases>=3.8.2 diff --git a/conda/environments/all_cuda-122_arch-x86_64.yaml b/conda/environments/all_cuda-122_arch-x86_64.yaml index d06a727f331..89eac98f652 100644 --- a/conda/environments/all_cuda-122_arch-x86_64.yaml +++ b/conda/environments/all_cuda-122_arch-x86_64.yaml @@ -37,13 +37,13 @@ dependencies: - hypothesis - identify>=2.5.20 - ipython -- libarrow-acero==16.0.0.* -- libarrow-dataset==16.0.0.* -- libarrow==16.0.0.* +- libarrow-acero==16.1.0.* +- libarrow-dataset==16.1.0.* +- libarrow==16.1.0.* - libcufile-dev - libcurand-dev - libkvikio==24.6.* -- libparquet==16.0.0.* +- libparquet==16.1.0.* - librdkafka>=1.9.0,<1.10.0a0 - librmm==24.6.* - make @@ -63,7 +63,7 @@ dependencies: - pandoc - pip - pre-commit -- pyarrow==16.0.0.* +- pyarrow==16.1.0.* - pydata-sphinx-theme!=0.14.2 - pynvjitlink - pytest-benchmark diff --git a/conda/recipes/cudf/meta.yaml b/conda/recipes/cudf/meta.yaml index 12e29c77a98..e7245e67659 100644 --- a/conda/recipes/cudf/meta.yaml +++ b/conda/recipes/cudf/meta.yaml @@ -64,7 +64,7 @@ requirements: - scikit-build-core >=0.7.0 - dlpack >=0.8,<1.0 - numpy 1.23 - - pyarrow ==16.0.0.* + - pyarrow ==16.1.0.* - libcudf ={{ version }} - rmm ={{ minor_version }} {% if cuda_major == "11" %} @@ -82,7 +82,7 @@ requirements: - cupy >=12.0.0 - numba >=0.57 - {{ pin_compatible('numpy', max_pin='x') }} - - {{ pin_compatible('pyarrow', max_pin='x') }} + - {{ pin_compatible('pyarrow', max_pin='x.x') }} - libcudf ={{ version }} - {{ pin_compatible('rmm', max_pin='x.x') }} - fsspec >=0.6.0 diff --git a/conda/recipes/libcudf/conda_build_config.yaml b/conda/recipes/libcudf/conda_build_config.yaml index 61ffcf3c3de..c01178bf732 100644 --- a/conda/recipes/libcudf/conda_build_config.yaml +++ b/conda/recipes/libcudf/conda_build_config.yaml @@ -20,7 +20,7 @@ cmake_version: - ">=3.26.4" libarrow_version: - - "==16.0.0" + - "==16.1.0" dlpack_version: - ">=0.8,<1.0" diff --git a/conda/recipes/libcudf/meta.yaml b/conda/recipes/libcudf/meta.yaml index ad2e840c71d..76115362b6c 100644 --- a/conda/recipes/libcudf/meta.yaml +++ b/conda/recipes/libcudf/meta.yaml @@ -86,9 +86,6 @@ outputs: {% else %} - {{ compiler('cuda') }} {% endif %} - # TODO: start taking libarrow's run exports again wwhen they're correct for 16.0 - # ref: https://github.com/conda-forge/arrow-cpp-feedstock/issues/1418 - - libarrow requirements: build: - cmake {{ cmake_version }} @@ -108,12 +105,6 @@ outputs: - librmm ={{ minor_version }} - libkvikio ={{ minor_version }} - dlpack {{ dlpack_version }} - # TODO: start taking libarrow's run exports again wwhen they're correct for 16.0 - # ref: https://github.com/conda-forge/arrow-cpp-feedstock/issues/1418 - - libarrow>=16.0.0,<16.1.0a0 - - libarrow-acero>=16.0.0,<16.1.0a0 - - libarrow-dataset>=16.0.0,<16.1.0a0 - - libparquet>=16.0.0,<16.1.0a0 test: commands: - test -f $PREFIX/lib/libcudf.so diff --git a/cpp/cmake/thirdparty/get_arrow.cmake b/cpp/cmake/thirdparty/get_arrow.cmake index 73e66cce608..0afdc526981 100644 --- a/cpp/cmake/thirdparty/get_arrow.cmake +++ b/cpp/cmake/thirdparty/get_arrow.cmake @@ -430,7 +430,7 @@ if(NOT DEFINED CUDF_VERSION_Arrow) set(CUDF_VERSION_Arrow # This version must be kept in sync with the libarrow version pinned for builds in # dependencies.yaml. - 16.0.0 + 16.1.0 CACHE STRING "The version of Arrow to find (or build)" ) endif() diff --git a/dependencies.yaml b/dependencies.yaml index f20c1591e73..0844d86fb66 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -289,7 +289,7 @@ dependencies: - cython>=3.0.3 # Hard pin the patch version used during the build. This must be kept # in sync with the version pinned in get_arrow.cmake. - - pyarrow==16.0.0.* + - pyarrow==16.1.0.* - output_types: conda packages: - scikit-build-core>=0.7.0 @@ -332,25 +332,25 @@ dependencies: packages: # Hard pin the Arrow patch version used during the build. This must # be kept in sync with the version pinned in get_arrow.cmake. - - libarrow-acero==16.0.0.* - - libarrow-dataset==16.0.0.* - - libarrow==16.0.0.* - - libparquet==16.0.0.* + - libarrow-acero==16.1.0.* + - libarrow-dataset==16.1.0.* + - libarrow==16.1.0.* + - libparquet==16.1.0.* libarrow_run: common: - output_types: conda packages: - # Allow runtime version to float up to minor version - - libarrow-acero>=16.0.0,<16.1.0a0 - - libarrow-dataset>=16.0.0,<16.1.0a0 - - libarrow>=16.0.0,<16.1.0a0 - - libparquet>=16.0.0,<16.1.0a0 + # Allow runtime version to float up to patch version + - libarrow-acero>=16.1.0,<16.2.0a0 + - libarrow-dataset>=16.1.0,<16.2.0a0 + - libarrow>=16.1.0,<16.2.0a0 + - libparquet>=16.1.0,<16.2.0a0 pyarrow_run: common: - output_types: [conda, requirements, pyproject] packages: # Allow runtime version to float up to patch version - - pyarrow>=16.0.0,<16.1.0a0 + - pyarrow>=16.1.0,<16.2.0a0 cuda_version: specific: - output_types: conda diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index a9bca7d8b98..83b7353ad89 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -1833,6 +1833,9 @@ def test_orc_writer_negative_timestamp(negative_timestamp_df): ) +@pytest.mark.skip( + reason="Bug specific to rockylinux8: https://github.com/rapidsai/cudf/issues/15802", +) def test_orc_reader_apache_negative_timestamp(datadir): path = datadir / "TestOrcFile.apache_timestamp.orc" diff --git a/python/cudf/pyproject.toml b/python/cudf/pyproject.toml index 826362f0632..38aa6eeb24e 100644 --- a/python/cudf/pyproject.toml +++ b/python/cudf/pyproject.toml @@ -7,7 +7,7 @@ requires = [ "cython>=3.0.3", "ninja", "numpy==1.23.*", - "pyarrow==16.0.0.*", + "pyarrow==16.1.0.*", "rmm==24.6.*", "scikit-build-core[pyproject]>=0.7.0", ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. @@ -34,7 +34,7 @@ dependencies = [ "packaging", "pandas>=2.0,<2.2.3dev0", "ptxcompiler", - "pyarrow>=16.0.0,<16.1.0a0", + "pyarrow>=16.1.0,<16.2.0a0", "rich", "rmm==24.6.*", "typing_extensions>=4.0.0", diff --git a/python/cudf_kafka/pyproject.toml b/python/cudf_kafka/pyproject.toml index 787dd8a97d7..80e30e000c0 100644 --- a/python/cudf_kafka/pyproject.toml +++ b/python/cudf_kafka/pyproject.toml @@ -7,7 +7,7 @@ requires = [ "cython>=3.0.3", "ninja", "numpy==1.23.*", - "pyarrow==16.0.0.*", + "pyarrow==16.1.0.*", "scikit-build-core[pyproject]>=0.7.0", ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. From fea8fd611f38dc2610d97caded44b17905efbfa5 Mon Sep 17 00:00:00 2001 From: nvdbaranec <56695930+nvdbaranec@users.noreply.github.com> Date: Tue, 21 May 2024 17:51:38 -0500 Subject: [PATCH 3/5] Add multithreaded parquet reader benchmarks. (#15585) Addresses: https://github.com/rapidsai/cudf/issues/12700 Adds multithreaded benchmarks for the parquet reader. Separate benchmarks for the chunked and non-chunked readers. In both cases, the primary cases are 2, 4 and 8 threads running reads at the same time. There is not a ton of variability in the other benchmarking axes. The primary use of this particular benchmark is to see inter-kernel performance (that is, how well do our many different kernel types coexist with each other). Whereas normal benchmarks tend to be more for intra-kernel performance checking. NVTX ranges are included to help visually group the bundles of reads together in nsight-sys. I also posted a new issue which would help along these lines: https://github.com/rapidsai/cudf/issues/15575 Update: I've tweaked some of the numbers to demonstrate some mild performance improvements as we go up in thread count, and included 1-thread as a case. Some examples: ``` ## parquet_multithreaded_read_decode_mixed | cardinality | total_data_size | num_threads | num_cols | bytes_per_second | |-------------|-----------------|-------------|----------|------------------| | 1000 | 536870912 | 1 | 4 | 28874731473 | | 1000 | 1073741824 | 1 | 4 | 30564139526 | | 1000 | 536870912 | 2 | 4 | 29399214255 | | 1000 | 1073741824 | 2 | 4 | 31486327920 | | 1000 | 536870912 | 4 | 4 | 27009769400 | | 1000 | 1073741824 | 4 | 4 | 32234841632 | | 1000 | 536870912 | 8 | 4 | 24416650118 | | 1000 | 1073741824 | 8 | 4 | 30841124677 | ``` ``` ## parquet_multithreaded_read_decode_chunked_string | cardinality | total_data_size | num_threads | num_cols | bytes_per_second | |-------------|-----------------|-------------|----------|------------------| | 1000 | 536870912 | 1 | 4 | 14637004584 | | 1000 | 1073741824 | 1 | 4 | 16025843421 | | 1000 | 536870912 | 2 | 4 | 15333491977 | | 1000 | 1073741824 | 2 | 4 | 17164197747 | | 1000 | 536870912 | 4 | 4 | 16556300728 | | 1000 | 1073741824 | 4 | 4 | 17711338934 | | 1000 | 536870912 | 8 | 4 | 15788371298 | | 1000 | 1073741824 | 8 | 4 | 17911649578 | ``` In addition, this benchmark clearly shows multi-thread only regressions. An example case below using the pageable-error-code regression we've seen in the past. Example without regression: ``` ## parquet_multithreaded_read_decode_chunked_fixed_width total_data_size | num_threads | bytes_per_second | ----------------|-------------|------------------| 536870912 | 1 | 25681728660 | 1073741824 | 1 | 26281335927 | 536870912 | 2 | 25597258848 | 1073741824 | 2 | 26733626352 | 536870912 | 4 | 25190211717 | 1073741824 | 4 | 28117411682 | 536870912 | 8 | 25805791994 | 1073741824 | 8 | 27788485204 | ``` Example with regression (pageable error-code return values): ``` ## parquet_multithreaded_read_decode_chunked_fixed_width total_data_size | num_threads | bytes_per_second | -----------------|------------|------------------| 536870912 | 1 | 25660470283 | 1073741824 | 1 | 26146862480 | 536870912 | 2 | 25040145602 | 1073741824 | 2 | 25460591520 | 536870912 | 4 | 22917046969 | 1073741824 | 4 | 24922624784 | 536870912 | 8 | 20529770200 | 1073741824 | 8 | 23333751767 | ``` In both cases, we can see that the single-thread case remains the same but there's a regression in the multi-thread case. particularly with 4 threads. Authors: - https://github.com/nvdbaranec - Mike Wilson (https://github.com/hyperbolic2346) Approvers: - Nghia Truong (https://github.com/ttnghia) - Vukasin Milovanovic (https://github.com/vuule) - Mike Wilson (https://github.com/hyperbolic2346) URL: https://github.com/rapidsai/cudf/pull/15585 --- cpp/benchmarks/CMakeLists.txt | 5 + cpp/benchmarks/io/cuio_common.hpp | 4 + .../io/parquet/parquet_reader_multithread.cpp | 351 ++++++++++++++++++ .../cudf}/utilities/thread_pool.hpp | 0 cpp/src/io/utilities/file_io_utilities.hpp | 4 +- 5 files changed, 362 insertions(+), 2 deletions(-) create mode 100644 cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp rename cpp/{src/io => include/cudf}/utilities/thread_pool.hpp (100%) diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index 4586a12f466..170cf27b72b 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -256,6 +256,11 @@ ConfigureNVBench( PARQUET_READER_NVBENCH io/parquet/parquet_reader_input.cpp io/parquet/parquet_reader_options.cpp ) +# ################################################################################################## +# * parquet multithread reader benchmark +# ---------------------------------------------------------------------- +ConfigureNVBench(PARQUET_MULTITHREAD_READER_NVBENCH io/parquet/parquet_reader_multithread.cpp) + # ################################################################################################## # * orc reader benchmark -------------------------------------------------------------------------- ConfigureNVBench(ORC_READER_NVBENCH io/orc/orc_reader_input.cpp io/orc/orc_reader_options.cpp) diff --git a/cpp/benchmarks/io/cuio_common.hpp b/cpp/benchmarks/io/cuio_common.hpp index 3d5be41e25f..6e0b32219ce 100644 --- a/cpp/benchmarks/io/cuio_common.hpp +++ b/cpp/benchmarks/io/cuio_common.hpp @@ -39,6 +39,10 @@ class cuio_source_sink_pair { // delete the temporary file std::remove(file_name.c_str()); } + // move constructor + cuio_source_sink_pair(cuio_source_sink_pair&& ss) = default; + cuio_source_sink_pair& operator=(cuio_source_sink_pair&& ss) = default; + /** * @brief Created a source info of the set type * diff --git a/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp new file mode 100644 index 00000000000..fbdcfb0ade9 --- /dev/null +++ b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp @@ -0,0 +1,351 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include + +#include + +// TODO: remove this once pinned/pooled is enabled by default in cuIO +void set_cuio_host_pinned_pool() +{ + using host_pooled_mr = rmm::mr::pool_memory_resource; + static std::shared_ptr mr = std::make_shared( + std::make_shared().get(), 256ul * 1024 * 1024); + cudf::io::set_host_memory_resource(*mr); +} + +size_t get_num_reads(nvbench::state const& state) { return state.get_int64("num_threads"); } + +size_t get_read_size(nvbench::state const& state) +{ + auto const num_reads = get_num_reads(state); + return state.get_int64("total_data_size") / num_reads; +} + +std::string get_label(std::string const& test_name, nvbench::state const& state) +{ + auto const num_cols = state.get_int64("num_cols"); + size_t const read_size_mb = get_read_size(state) / (1024 * 1024); + return {test_name + ", " + std::to_string(num_cols) + " columns, " + + std::to_string(state.get_int64("num_threads")) + " threads " + " (" + + std::to_string(read_size_mb) + " MB each)"}; +} + +std::tuple, size_t, size_t> write_file_data( + nvbench::state& state, std::vector const& d_types) +{ + cudf::size_type const cardinality = state.get_int64("cardinality"); + cudf::size_type const run_length = state.get_int64("run_length"); + cudf::size_type const num_cols = state.get_int64("num_cols"); + size_t const num_files = get_num_reads(state); + size_t const per_file_data_size = get_read_size(state); + + std::vector source_sink_vector; + + size_t total_file_size = 0; + + for (size_t i = 0; i < num_files; ++i) { + cuio_source_sink_pair source_sink{cudf::io::io_type::HOST_BUFFER}; + + auto const tbl = create_random_table( + cycle_dtypes(d_types, num_cols), + table_size_bytes{per_file_data_size}, + data_profile_builder().cardinality(cardinality).avg_run_length(run_length)); + auto const view = tbl->view(); + + cudf::io::parquet_writer_options write_opts = + cudf::io::parquet_writer_options::builder(source_sink.make_sink_info(), view) + .compression(cudf::io::compression_type::SNAPPY) + .max_page_size_rows(50000) + .max_page_size_bytes(1024 * 1024); + + cudf::io::write_parquet(write_opts); + total_file_size += source_sink.size(); + + source_sink_vector.push_back(std::move(source_sink)); + } + + return {std::move(source_sink_vector), total_file_size, num_files}; +} + +void BM_parquet_multithreaded_read_common(nvbench::state& state, + std::vector const& d_types, + std::string const& label) +{ + size_t const data_size = state.get_int64("total_data_size"); + auto const num_threads = state.get_int64("num_threads"); + + set_cuio_host_pinned_pool(); + + auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads); + cudf::detail::thread_pool threads(num_threads); + + auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types); + + auto mem_stats_logger = cudf::memory_stats_logger(); + + nvtxRangePushA(("(read) " + label).c_str()); + state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer, + [&](nvbench::launch& launch, auto& timer) { + auto read_func = [&](int index) { + auto const stream = streams[index % num_threads]; + auto& source_sink = source_sink_vector[index]; + cudf::io::parquet_reader_options read_opts = + cudf::io::parquet_reader_options::builder(source_sink.make_source_info()); + cudf::io::read_parquet(read_opts, stream, rmm::mr::get_current_device_resource()); + }; + + threads.paused = true; + for (size_t i = 0; i < num_files; ++i) { + threads.submit(read_func, i); + } + timer.start(); + threads.paused = false; + threads.wait_for_tasks(); + cudf::detail::join_streams(streams, cudf::get_default_stream()); + timer.stop(); + }); + nvtxRangePop(); + + auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value"); + state.add_element_count(static_cast(data_size) / time, "bytes_per_second"); + state.add_buffer_size( + mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage"); + state.add_buffer_size(total_file_size, "encoded_file_size", "encoded_file_size"); +} + +void BM_parquet_multithreaded_read_mixed(nvbench::state& state) +{ + auto label = get_label("mixed", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_common( + state, {cudf::type_id::INT32, cudf::type_id::DECIMAL64, cudf::type_id::STRING}, label); + nvtxRangePop(); +} + +void BM_parquet_multithreaded_read_fixed_width(nvbench::state& state) +{ + auto label = get_label("fixed width", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_common(state, {cudf::type_id::INT32}, label); + nvtxRangePop(); +} + +void BM_parquet_multithreaded_read_string(nvbench::state& state) +{ + auto label = get_label("string", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_common(state, {cudf::type_id::STRING}, label); + nvtxRangePop(); +} + +void BM_parquet_multithreaded_read_list(nvbench::state& state) +{ + auto label = get_label("list", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_common(state, {cudf::type_id::LIST}, label); + nvtxRangePop(); +} + +void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state, + std::vector const& d_types, + std::string const& label) +{ + size_t const data_size = state.get_int64("total_data_size"); + auto const num_threads = state.get_int64("num_threads"); + size_t const input_limit = state.get_int64("input_limit"); + size_t const output_limit = state.get_int64("output_limit"); + + set_cuio_host_pinned_pool(); + + auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads); + cudf::detail::thread_pool threads(num_threads); + auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types); + + auto mem_stats_logger = cudf::memory_stats_logger(); + + nvtxRangePushA(("(read) " + label).c_str()); + std::vector chunks; + state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer, + [&](nvbench::launch& launch, auto& timer) { + auto read_func = [&](int index) { + auto const stream = streams[index % num_threads]; + auto& source_sink = source_sink_vector[index]; + cudf::io::parquet_reader_options read_opts = + cudf::io::parquet_reader_options::builder(source_sink.make_source_info()); + // divide chunk limits by number of threads so the number of chunks produced is the + // same for all cases. this seems better than the alternative, which is to keep the + // limits the same. if we do that, as the number of threads goes up, the number of + // chunks goes down - so are actually benchmarking the same thing in that case? + auto reader = cudf::io::chunked_parquet_reader( + output_limit / num_threads, input_limit / num_threads, read_opts, stream); + + // read all the chunks + do { + auto table = reader.read_chunk(); + } while (reader.has_next()); + }; + + threads.paused = true; + for (size_t i = 0; i < num_files; ++i) { + threads.submit(read_func, i); + } + timer.start(); + threads.paused = false; + threads.wait_for_tasks(); + cudf::detail::join_streams(streams, cudf::get_default_stream()); + timer.stop(); + }); + nvtxRangePop(); + + auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value"); + state.add_element_count(static_cast(data_size) / time, "bytes_per_second"); + state.add_buffer_size( + mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage"); + state.add_buffer_size(total_file_size, "encoded_file_size", "encoded_file_size"); +} + +void BM_parquet_multithreaded_read_chunked_mixed(nvbench::state& state) +{ + auto label = get_label("mixed", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_chunked_common( + state, {cudf::type_id::INT32, cudf::type_id::DECIMAL64, cudf::type_id::STRING}, label); + nvtxRangePop(); +} + +void BM_parquet_multithreaded_read_chunked_fixed_width(nvbench::state& state) +{ + auto label = get_label("mixed", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_chunked_common(state, {cudf::type_id::INT32}, label); + nvtxRangePop(); +} + +void BM_parquet_multithreaded_read_chunked_string(nvbench::state& state) +{ + auto label = get_label("string", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_chunked_common(state, {cudf::type_id::STRING}, label); + nvtxRangePop(); +} + +void BM_parquet_multithreaded_read_chunked_list(nvbench::state& state) +{ + auto label = get_label("list", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_chunked_common(state, {cudf::type_id::LIST}, label); + nvtxRangePop(); +} + +// mixed data types: fixed width and strings +NVBENCH_BENCH(BM_parquet_multithreaded_read_mixed) + .set_name("parquet_multithreaded_read_decode_mixed") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}); + +NVBENCH_BENCH(BM_parquet_multithreaded_read_fixed_width) + .set_name("parquet_multithreaded_read_decode_fixed_width") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}); + +NVBENCH_BENCH(BM_parquet_multithreaded_read_string) + .set_name("parquet_multithreaded_read_decode_string") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}); + +NVBENCH_BENCH(BM_parquet_multithreaded_read_list) + .set_name("parquet_multithreaded_read_decode_list") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}); + +// mixed data types: fixed width, strings +NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_mixed) + .set_name("parquet_multithreaded_read_decode_chunked_mixed") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}) + .add_int64_axis("input_limit", {640 * 1024 * 1024}) + .add_int64_axis("output_limit", {640 * 1024 * 1024}); + +NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_fixed_width) + .set_name("parquet_multithreaded_read_decode_chunked_fixed_width") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}) + .add_int64_axis("input_limit", {640 * 1024 * 1024}) + .add_int64_axis("output_limit", {640 * 1024 * 1024}); + +NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_string) + .set_name("parquet_multithreaded_read_decode_chunked_string") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}) + .add_int64_axis("input_limit", {640 * 1024 * 1024}) + .add_int64_axis("output_limit", {640 * 1024 * 1024}); + +NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_list) + .set_name("parquet_multithreaded_read_decode_chunked_list") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}) + .add_int64_axis("input_limit", {640 * 1024 * 1024}) + .add_int64_axis("output_limit", {640 * 1024 * 1024}); diff --git a/cpp/src/io/utilities/thread_pool.hpp b/cpp/include/cudf/utilities/thread_pool.hpp similarity index 100% rename from cpp/src/io/utilities/thread_pool.hpp rename to cpp/include/cudf/utilities/thread_pool.hpp diff --git a/cpp/src/io/utilities/file_io_utilities.hpp b/cpp/src/io/utilities/file_io_utilities.hpp index 74a2ae53961..91ef41fba6e 100644 --- a/cpp/src/io/utilities/file_io_utilities.hpp +++ b/cpp/src/io/utilities/file_io_utilities.hpp @@ -17,10 +17,10 @@ #pragma once #ifdef CUFILE_FOUND -#include "thread_pool.hpp" - #include +#include + #include #endif From 9a0612b3add9c76ea8cb45cc230b75b2474d91f7 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Tue, 21 May 2024 17:09:00 -0700 Subject: [PATCH 4/5] Fix row group alignment in ORC writer (#15789) Closes https://github.com/rapidsai/cudf/issues/15775 ORC writer encodes null mask bits in multiples of eight to avoid issues with other readers reading partial encoded bytes. When this does not align with row groups, the null mask encode boundaries are moved to align to multiples of eight. There was a bug in the alignment code that caused a pointless shift by 8 bits and, then, issues in encode. This PR fixes the unnecessary shift. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Nghia Truong (https://github.com/ttnghia) - Muhammad Haseeb (https://github.com/mhaseeb123) - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/15789 --- cpp/src/io/orc/writer_impl.cu | 12 ++++++++++-- .../data/orc/TestOrcFile.MapManyNulls.parquet | Bin 0 -> 6353 bytes python/cudf/cudf/tests/test_orc.py | 13 +++++++++++++ 3 files changed, 23 insertions(+), 2 deletions(-) create mode 100644 python/cudf/cudf/tests/data/orc/TestOrcFile.MapManyNulls.parquet diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 750a593920c..344e216cdc8 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -782,8 +782,16 @@ std::vector> calculate_aligned_rowgroup_bounds( } else { // pushdown mask present; null mask bits w/ set pushdown mask bits will be encoded // Use the number of set bits in pushdown mask as size - auto bits_to_borrow = - 8 - (d_pd_set_counts[rg_idx][parent_col_idx] - previously_borrowed) % 8; + auto bits_to_borrow = [&]() { + auto const parent_valid_count = d_pd_set_counts[rg_idx][parent_col_idx]; + if (parent_valid_count < previously_borrowed) { + // Borrow to make an empty rowgroup + return previously_borrowed - parent_valid_count; + } + auto const misalignment = (parent_valid_count - previously_borrowed) % 8; + return (8 - misalignment) % 8; + }(); + if (bits_to_borrow == 0) { // Didn't borrow any bits for this rowgroup previously_borrowed = 0; diff --git a/python/cudf/cudf/tests/data/orc/TestOrcFile.MapManyNulls.parquet b/python/cudf/cudf/tests/data/orc/TestOrcFile.MapManyNulls.parquet new file mode 100644 index 0000000000000000000000000000000000000000..a80ce5fbd25837e1e25991dd14e9d2bed849c651 GIT binary patch literal 6353 zcmaLcd0Z36qX6)5#So7I0)~hRDT$i}5fm>(E&)SSLo6Vg;H-k%3FTFzTf-f{oc!GKiSFb?99&0?#z5Q zsc~uHKCV9AKHf9vohyUk#9%Pu`@Z6Q7(UO#=`Fnp}@7$?cZ$5DQoO(;gHAYGC$7f&`LfBlfm}>KfDtMi@ZfVRStQ2JCIkW_q0g2JQCx}hfXxKB6pCge3b?vTWmoF5Sg@-= zhm(YoM_;VU%T4XDK?(@x)Ud4vp&*mW3XnmGlotxQ8FZPX(f}j`T9IiZK$1*FAifD4 zQlAD@=qaM!!j#z|2M=ba`zbUmDWXe&DJ&UvLjyfAHZzl5;VCABSrtBMNpz2s9wxmz zK z)EI&|u!` z2?*R(QOm=eGJq0yPbDiczJPK#f()8ufB}V^4Wn6n*jJ4y$a*5Ks#>6Bah#KJy3e$* z4+vl;vn&+fFV!6gjW%-yh1Mu6vW#?{5q4t(Ukd^3hImo}DFijP3M))eFi8moAgCq_ z)f**%8E7FfMH*4XQ^+VTTa_TtX4w}xxDsnRM_+5MmCG`<+!|*Atj4ogzyK&1HDTIp z)B@u+Dx0!V04E5egv9t>T*PJ*<;qCdoWz36kdEmr)M#Ldss^T*Z~%R-oYE*~8(^hW z133sU9MaTa*+h-Q0Z4&Nk8?0_G6}T4IO$P#Fi5MlsOSzvZN z0U0oFrHqXe4nmQLr~<^^mKs2JKTkrq(lv7y8}zu1fODyICzUdR@-;bx1T3_e)F=m} zVi>ntLb+240wwHPmQI2O0fHyao{cC85yL};#yW)Y3N5MN#*!MbM2BdIY}`)=bQ$Jq zV3i_5N~%di5==>{Kq!GR6ABYtO>R0+LJCZ%G>A;D?qm|A0wza>QfUW)6q-QER0ddP zQ7bVj&1B)2RXCZ;)-e%{n1|UJVjaff==>nU&W25pK_-SU#EaltF+ml{m(|<2sv4#l zP5`yQBtl`bTIL~?AgPN~dU_xLK+KilHmuqb3u7oiUAZj!nlJ%iBN_TC%p}2Ts&O3C z^Q(lo8qgzJ6$Vk>L0XH_2&^{5kN~A(HZLzfF|gPwfJuwdlS($9zziyrLJ8o?8X)2T z*e8~xM2ILrh*FdiLbPZNuCY2HY(#HG0fvh$c$|vi>MIr!QnN;El!!@ltVoocL_m~` zn<1#w3P{YX#;w&Pz)dK-$}U2=d`B{gSEB|CJKh3drI%GjA{J*Wx(JXe2M)xz9e8L& zY4kK{MKh^DaW*Q4>J0%zkU3Cak1YbuI#NTz%vgX*GzJXzaMb_^lNd}^S$s0&8>rya z6Kt<~bCtG|NlJtQR1M@_x!gpXA{S(<;;m-F#Ad@>07yG&%K{85WuvEmoB}E&fs`dz z5vf{IfWuYUOo0IA!%Udu^MppAwOH+_#KKdFZk!az1XRv_v2X zWPAEKq%e^vB=qzc;$;}=AQ74+ci<)CMTz6S7MN)O zn35^V5;zd|TA4zd4SVwGp7d1Ia?yIkP>8v^(W4fqQr3d{8eTRLL{x&>K#yvs#SVPo z3|N`X=W1(V5eQNNx^_4mD~AOfRAn*1q&e4FNa1dXwh9x`k6V%jrAHwE;Q&ixnmw}s z#U?2n;>05kok|Gfgvr@S-In-^ZQ0~D&ZE@iJMBfvD{rQq5LG7LdAcR@c*M{v?_aSm zML3`24Q~HzSTee)FnF66w&YdYt@f@C=I3R9g-Fy-JdP|{x8u9|!TY@{(FN4Ecs@HUUH}<#BNAuK=Z;op$zkRm(-pv~# zUYqFW<4F$~P2InqE4n>27F&5?`hg_jZ!Yb@yNg&n{^Xk3tEK82{gy4%Emrz+S2Rwz zT9|wFY&U=NuE#NtUGSLDp!Uosqgq?n4tsL6DkMO={yOuwOW!YfXq?NP8}{wCA2K&b z2-NbK{0na;b)262$1krFrUkUl&l?(U+j~VYrlq~Y>AB_Io&1q&2cOz%&p$DDU-X-z z?iayN2TcxI+rM+ZsXm9hR_(J?+5V51k(K^Er=1^9im*!WUs2TdANq{#IsBy4$t~k7 zjx_r~n)uPz5Eh_dIan-)=Ww>3v;^=a9`Ki$zCq`e7%M%_NG-SAJ zYMyb>hwAlby9T~Cs$;#IveHbm2Kyd4#9$M%r?S9MiG^61R1 zyB2)<^W8U%nuc%ZvOKRwEF%kabIe^U*GZSOzsTc7t=X`5Tx;*__MZ81FN5$6y=tc_ zTTlO8lbz%I2CWny3CRpPeeY=@gVqPjhc&~;xlbs4AKE6@N7CE>W0Or6)}UQ&`G3g_ zhp!lfbDKtA+7Fl1w@tU5r){$TB3sb^JAd|p>vv+UBCX7<_S=&qces5OT#4KGs9J-Mo9Ou;k8`Qvw^^=qS1-Q=9X zciz3cH|1m<+Mu1QUbW#jt7zXFUU-tg&G`BEG+?xqQfc_*Gd?bY9H6&UJV8m@h`tHn|;s zIb8U-@SWPf;Z@_=p2ktWoo^lb-yDPdyw5$SZE`zlwAQ%ly}In|xo%TkdgJoIwnsSw zS^eC1N*+W--}Ik$jsN-rf6kwyQVLt6&QF_?i_bB>-ZZb|A#vV&?pfL<`$GzhDR+jB zE;JtC)!iL zfyW#*RNOzA^nkrzJ-p-(Z0_05+l{H++@rh8LsuU*?!6Pd{c!cEJ;Q%zT~)cfc!J*< zLEGfvw9qlgmf7nAy>9-zt)SMabkd%I>CJz1mfD_q@JE~;@c35$1)YNN#`pRKE2eCE zmH&FozCW*m;it}CT(Bl@a&^_x!};gu4+wS@w7vS=sT>p4Y*h_*Qy)1Hl?NU4ulcIM zFlu~e*Zi=esKmWq=tQ^6ZJE#W>ze1i@3YCS7t>@*kE(qW{F^S#nOJe@`1|@B{fh0- zk+H#p(uXO2U@b3bOPJiXW9B(GG~DwKHgj&DO&*P!{|}!z(;_$No_k!qZyek>mss4Q(ZIjT%5u&dX%*+4rb~n~N3pha%ujl=n((jDeo-zd#;b6(mp+R@TZT8aC2#MKe+e)v0uKHhV{4v|Al=0 z{PFG2Iq{D_pNv<_+qO*klEP1!-wyxi<~FP>zJ2zAw;x^5S3`FPs|!v~Ji26c&nboU zqI%)%0n@6d(Kh+@!Oe(+k*{kHt@>TRCX>w1nPPr?<6LRQPp3MfXHIVWwx|1_Hu=)? zdi8^2MxikwetmS2oqZi)*c(aE9gYLfhoAdHrag7h%kq1HS7SN86-#lCk4@#glVC z=UEnKq;D`zc4O}wa^pR)*G+9&7`1YI&7TzwlaKcVKc<8uPXuk8e^zf?_!l$qyJI)= z(i&TNdBvxOvP%<2uFrb2=7T7bvtAO(?!_Q0xvADnN0PI4(q)PzH%%zg-{)+fHS_sNt?!19i^f0~(l3-;D$W?wa{&7K z>W$XdYt&)69tR4ZKAN|ewn@Gy`L~22SXOp87>$ZBYl6|$a)A1-Tv2bZvig)|Q6L&8VV zC!gKol2WI?-|+}8IhF2InxAld$hEh^_mM)&HT7oB)6_e+hF9l&asRS%*Tr3L&am1f zn?lu}zxklJl!IkFpO9+i)tt6E&kie?|mp4biQA~x|Ffo4>M++-^$`_7#7uO|0>ut@$%wXA$JM~H}dap zpV0bYoUFcMX!%B_IXGeWuf1DloP$67DD#*yZ*A(bQvLUPj=VTAYedn#j;+-J|qWjO)muZ{a z>=;bj%ARepGiJ<8YJ|SXMr4dX{nNf-h{L}cb{GM^m(b{^@)_% zeKwhqR#wp5eKxZzJSBUxJfyxZ>?m!MOIPkJ>UGvNX^w^Vi#>TUr{%X>!5*$Zyo|at zukKp+;;G3|L&8`3Upd;}>x#MRr_=q8EgUuBqIw@~lM#s>-{gV?rBfTOcbfY5kLJGo zi+s>@AnPNeo9fVijK+^Qm##@Twm7-KE@wA>i17)ZZz00 zNZQ9PogUZk$>6c=vlr4fIbNM|&=UD}1%K!DF72-Hec}B*hdGwV>`EMURKEdh8f=}k zy-+>-*U^=?kI^;>-DrwjeDIA*bZ$@ydH0~aceTuL`xWDdsj%(gla&{m5AWKNcq;yx zrFZd^mx$!ktLTLa#iY@iS08`9cSK)vy3Zy%+2N9pIlIP14xf5xLA>oy|4$ENXYzh+ z9?u``a_xufd0$+jyc{u~*3N2fzVK?(&|Gq_`rd)&e5jW>Z;5JmLBG(Z$0PK2+9n@2Cl+-&{dpiT^YYVM`Ta!mM;?WF+_GNMGw;{o7p|Rk(4-sZ_SZdoG4cM#?P<*0o>?cKu=ni6qc~HM zw;Ps@ih13+#o6gB#PD(Un=fJ(FILKzN`0IeGI^|A-n?m zhg6nNm(v#!!u#)Q<3+A1FJDm3;Bfv|bymuYidBUL9JjHd5usxk9G}H2auor8#|Dgv Z2n`z_8W!dkLU(CjuX@sro=X3j@n1^C^eO-V literal 0 HcmV?d00001 diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index 83b7353ad89..b83b8f08a8b 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -1954,3 +1954,16 @@ def test_writer_lz4(): got = pd.read_orc(buffer) assert_eq(gdf, got) + + +def test_row_group_alignment(datadir): + path = datadir / "TestOrcFile.MapManyNulls.parquet" + + expected = cudf.read_parquet(path) + + buffer = BytesIO() + expected.to_orc(buffer) + + got = cudf.read_orc(buffer) + + assert_eq(expected, got) From 217c73f7d34c84f786707f335b95ed1aae94e87a Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Wed, 22 May 2024 11:26:50 -0500 Subject: [PATCH 5/5] Deprecate `Groupby.collect` (#15808) After we made our groupby fail more aggressively for unsupported types in https://github.com/rapidsai/cudf/pull/15712, `Groupby.collect` started to fail on string column, where this isn't a supported aggregation on string column in pandas and this method doesn't exist in pandas Groupby, hence this PR suggest the alternative equivalent and deprecates the API to be removed in next release. Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Matthew Roeschke (https://github.com/mroeschke) - Richard (Rick) Zamora (https://github.com/rjzamora) URL: https://github.com/rapidsai/cudf/pull/15808 --- python/cudf/cudf/core/groupby/groupby.py | 12 ++++++++- python/dask_cudf/dask_cudf/expr/_groupby.py | 26 +++++++++++-------- python/dask_cudf/dask_cudf/groupby.py | 23 +++++++++------- .../dask_cudf/dask_cudf/tests/test_groupby.py | 14 +++++++--- 4 files changed, 51 insertions(+), 24 deletions(-) diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index 3e4b8192888..bf24864c29d 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -40,6 +40,15 @@ from cudf.utils.utils import GetAttrGetItemMixin +def _deprecate_collect(): + warnings.warn( + "Groupby.collect is deprecated and " + "will be removed in a future version. " + "Use `.agg(list)` instead.", + FutureWarning, + ) + + # The three functions below return the quantiles [25%, 50%, 75%] # respectively, which are called in the describe() method to output # the summary stats of a GroupBy object @@ -2180,7 +2189,8 @@ def func(x): @_cudf_nvtx_annotate def collect(self): """Get a list of all the values for each column in each group.""" - return self.agg("collect") + _deprecate_collect() + return self.agg(list) @_cudf_nvtx_annotate def unique(self): diff --git a/python/dask_cudf/dask_cudf/expr/_groupby.py b/python/dask_cudf/dask_cudf/expr/_groupby.py index 116893891e3..65688115b59 100644 --- a/python/dask_cudf/dask_cudf/expr/_groupby.py +++ b/python/dask_cudf/dask_cudf/expr/_groupby.py @@ -9,19 +9,21 @@ from dask.dataframe.groupby import Aggregation +from cudf.core.groupby.groupby import _deprecate_collect + ## ## Custom groupby classes ## -class Collect(SingleAggregation): +class ListAgg(SingleAggregation): @staticmethod def groupby_chunk(arg): - return arg.agg("collect") + return arg.agg(list) @staticmethod def groupby_aggregate(arg): - gb = arg.agg("collect") + gb = arg.agg(list) if gb.ndim > 1: for col in gb.columns: gb[col] = gb[col].list.concat() @@ -30,10 +32,10 @@ def groupby_aggregate(arg): return gb.list.concat() -collect_aggregation = Aggregation( - name="collect", - chunk=Collect.groupby_chunk, - agg=Collect.groupby_aggregate, +list_aggregation = Aggregation( + name="list", + chunk=ListAgg.groupby_chunk, + agg=ListAgg.groupby_aggregate, ) @@ -41,13 +43,13 @@ def _translate_arg(arg): # Helper function to translate args so that # they can be processed correctly by upstream # dask & dask-expr. Right now, the only necessary - # translation is "collect" aggregations. + # translation is list aggregations. if isinstance(arg, dict): return {k: _translate_arg(v) for k, v in arg.items()} elif isinstance(arg, list): return [_translate_arg(x) for x in arg] elif arg in ("collect", "list", list): - return collect_aggregation + return list_aggregation else: return arg @@ -84,7 +86,8 @@ def __getitem__(self, key): return g def collect(self, **kwargs): - return self._single_agg(Collect, **kwargs) + _deprecate_collect() + return self._single_agg(ListAgg, **kwargs) def aggregate(self, arg, **kwargs): return super().aggregate(_translate_arg(arg), **kwargs) @@ -96,7 +99,8 @@ def __init__(self, *args, observed=None, **kwargs): super().__init__(*args, observed=observed, **kwargs) def collect(self, **kwargs): - return self._single_agg(Collect, **kwargs) + _deprecate_collect() + return self._single_agg(ListAgg, **kwargs) def aggregate(self, arg, **kwargs): return super().aggregate(_translate_arg(arg), **kwargs) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index 43ad4f0fee3..ef47ea436c7 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -15,6 +15,7 @@ from dask.utils import funcname import cudf +from cudf.core.groupby.groupby import _deprecate_collect from cudf.utils.nvtx_annotation import _dask_cudf_nvtx_annotate from dask_cudf.sorting import _deprecate_shuffle_kwarg @@ -28,7 +29,7 @@ "sum", "min", "max", - "collect", + list, "first", "last", ) @@ -164,9 +165,10 @@ def max(self, split_every=None, split_out=1): @_dask_cudf_nvtx_annotate @_check_groupby_optimized def collect(self, split_every=None, split_out=1): + _deprecate_collect() return _make_groupby_agg_call( self, - self._make_groupby_method_aggs("collect"), + self._make_groupby_method_aggs(list), split_every, split_out, ) @@ -308,9 +310,10 @@ def max(self, split_every=None, split_out=1): @_dask_cudf_nvtx_annotate @_check_groupby_optimized def collect(self, split_every=None, split_out=1): + _deprecate_collect() return _make_groupby_agg_call( self, - {self._slice: "collect"}, + {self._slice: list}, split_every, split_out, )[self._slice] @@ -472,7 +475,7 @@ def groupby_agg( This aggregation algorithm only supports the following options - * "collect" + * "list" * "count" * "first" * "last" @@ -667,8 +670,8 @@ def _redirect_aggs(arg): sum: "sum", max: "max", min: "min", - list: "collect", - "list": "collect", + "collect": list, + "list": list, } if isinstance(arg, dict): new_arg = dict() @@ -704,7 +707,7 @@ def _aggs_optimized(arg, supported: set): _global_set = set(arg) return bool(_global_set.issubset(supported)) - elif isinstance(arg, str): + elif isinstance(arg, (str, type)): return arg in supported return False @@ -783,6 +786,8 @@ def _tree_node_agg(df, gb_cols, dropna, sort, sep): agg = col.split(sep)[-1] if agg in ("count", "sum"): agg_dict[col] = ["sum"] + elif agg == "list": + agg_dict[col] = [list] elif agg in OPTIMIZED_AGGS: agg_dict[col] = [agg] else: @@ -873,8 +878,8 @@ def _finalize_gb_agg( gb.drop(columns=[sum_name], inplace=True) if "count" not in agg_list: gb.drop(columns=[count_name], inplace=True) - if "collect" in agg_list: - collect_name = _make_name((col, "collect"), sep=sep) + if list in agg_list: + collect_name = _make_name((col, "list"), sep=sep) gb[collect_name] = gb[collect_name].list.concat() # Ensure sorted keys if `sort=True` diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index dc279bfa690..cf916b713b2 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -9,6 +9,7 @@ from dask.utils_test import hlg_layer import cudf +from cudf.testing._utils import expect_warning_if import dask_cudf from dask_cudf.groupby import OPTIMIZED_AGGS, _aggs_optimized @@ -47,7 +48,13 @@ def pdf(request): return pdf -@pytest.mark.parametrize("aggregation", OPTIMIZED_AGGS) +# NOTE: We only want to test aggregation "methods" here, +# so we need to leave out `list`. We also include a +# deprecation check for "collect". +@pytest.mark.parametrize( + "aggregation", + sorted(tuple(set(OPTIMIZED_AGGS) - {list}) + ("collect",)), +) @pytest.mark.parametrize("series", [False, True]) def test_groupby_basic(series, aggregation, pdf): gdf = cudf.DataFrame.from_pandas(pdf) @@ -62,8 +69,9 @@ def test_groupby_basic(series, aggregation, pdf): check_dtype = aggregation != "count" - expect = getattr(gdf_grouped, aggregation)() - actual = getattr(ddf_grouped, aggregation)() + with expect_warning_if(aggregation == "collect"): + expect = getattr(gdf_grouped, aggregation)() + actual = getattr(ddf_grouped, aggregation)() if not QUERY_PLANNING_ON: assert_cudf_groupby_layers(actual)