diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 4edd6965c4b..8bd4c8d1a63 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -160,7 +160,7 @@ git submodule update --init --remote --recursive ```bash # create the conda environment (assuming in base `cudf` directory) # note: RAPIDS currently doesn't support `channel_priority: strict`; use `channel_priority: flexible` instead -conda env create --name cudf_dev --file conda/environments/cudf_dev_cuda10.0.yml +conda env create --name cudf_dev --file conda/environments/cudf_dev_cuda11.0.yml # activate the environment conda activate cudf_dev ``` @@ -281,8 +281,8 @@ A Dockerfile is provided with a preconfigured conda environment for building and ### Prerequisites * Install [nvidia-docker2](https://github.com/nvidia/nvidia-docker/wiki/Installation-(version-2.0)) for Docker + GPU support -* Verify NVIDIA driver is `410.48` or higher -* Ensure CUDA 10.0+ is installed +* Verify NVIDIA driver is `450.80.02` or higher +* Ensure CUDA 11.0+ is installed ### Usage @@ -309,9 +309,9 @@ flag. Below is a list of the available arguments and their purpose: | Build Argument | Default Value | Other Value(s) | Purpose | | --- | --- | --- | --- | -| `CUDA_VERSION` | 10.0 | 10.1, 10.2 | set CUDA version | -| `LINUX_VERSION` | ubuntu16.04 | ubuntu18.04 | set Ubuntu version | -| `CC` & `CXX` | 5 | 7 | set gcc/g++ version; **NOTE:** gcc7 requires Ubuntu 18.04 | +| `CUDA_VERSION` | 11.0 | 11.1, 11.2.2 | set CUDA version | +| `LINUX_VERSION` | ubuntu18.04 | ubuntu20.04 | set Ubuntu version | +| `CC` & `CXX` | 9 | 10 | set gcc/g++ version | | `CUDF_REPO` | This repo | Forks of cuDF | set git URL to use for `git clone` | | `CUDF_BRANCH` | main | Any branch name | set git branch to checkout of `CUDF_REPO` | | `NUMBA_VERSION` | newest | >=0.40.0 | set numba version | diff --git a/Dockerfile b/Dockerfile index f48ed3646f4..d24c5d05556 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,18 +1,20 @@ +# Copyright (c) 2021, NVIDIA CORPORATION. + # An integration test & dev container which builds and installs cuDF from main -ARG CUDA_VERSION=10.1 +ARG CUDA_VERSION=11.0 ARG CUDA_SHORT_VERSION=${CUDA_VERSION} -ARG LINUX_VERSION=ubuntu16.04 +ARG LINUX_VERSION=ubuntu18.04 FROM nvidia/cuda:${CUDA_VERSION}-devel-${LINUX_VERSION} ENV LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/cuda/lib64:/usr/local/lib -# Needed for cudf.concat(), avoids "OSError: library nvvm not found" -ENV NUMBAPRO_NVVM=/usr/local/cuda/nvvm/lib64/libnvvm.so -ENV NUMBAPRO_LIBDEVICE=/usr/local/cuda/nvvm/libdevice/ ENV DEBIAN_FRONTEND=noninteractive -ARG CC=5 -ARG CXX=5 +ARG CC=9 +ARG CXX=9 RUN apt update -y --fix-missing && \ apt upgrade -y && \ + apt install -y --no-install-recommends software-properties-common && \ + add-apt-repository ppa:ubuntu-toolchain-r/test && \ + apt update -y --fix-missing && \ apt install -y --no-install-recommends \ git \ gcc-${CC} \ @@ -66,18 +68,10 @@ RUN if [ -f /cudf/docker/package_versions.sh ]; \ conda env create --name cudf --file /cudf/conda/environments/cudf_dev_cuda${CUDA_SHORT_VERSION}.yml ; \ fi -# libcudf build/install ENV CC=/usr/bin/gcc-${CC} ENV CXX=/usr/bin/g++-${CXX} -RUN source activate cudf && \ - mkdir -p /cudf/cpp/build && \ - cd /cudf/cpp/build && \ - cmake .. -DCMAKE_INSTALL_PREFIX=${CONDA_PREFIX} && \ - make -j"$(nproc)" install -# cuDF build/install +# libcudf & cudf build/install RUN source activate cudf && \ - cd /cudf/python/cudf && \ - python setup.py build_ext --inplace && \ - python setup.py install && \ - python setup.py install + cd /cudf/ && \ + ./build.sh libcudf cudf diff --git a/conda/environments/cudf_dev_cuda10.1.yml b/conda/environments/cudf_dev_cuda10.1.yml deleted file mode 100644 index 3c26dedda20..00000000000 --- a/conda/environments/cudf_dev_cuda10.1.yml +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. - -name: cudf_dev -channels: - - rapidsai - - nvidia - - rapidsai-nightly - - conda-forge - - defaults -dependencies: - - clang=8.0.1 - - clang-tools=8.0.1 - - cupy>7.1.0,<9.0.0a0 - - rmm=0.20.* - - cmake>=3.14 - - cmake_setuptools>=0.1.3 - - python>=3.7,<3.9 - - numba>=0.49.0,!=0.51.0 - - numpy - - pandas>=1.0,<1.3.0dev0 - - pyarrow=1.0.1 - - fastavro>=0.22.9 - - notebook>=0.5.0 - - cython>=0.29,<0.30 - - fsspec>=0.6.0 - - pytest - - pytest-benchmark - - pytest-xdist - - sphinx - - sphinx_rtd_theme - - sphinxcontrib-websupport - - nbsphinx - - numpydoc - - ipython - - recommonmark - - pandoc=<2.0.0 - - cudatoolkit=10.1 - - pip - - flake8=3.8.3 - - black=19.10 - - isort=5.0.7 - - mypy=0.782 - - typing_extensions - - pre_commit - - dask==2021.4.0 - - distributed>=2.22.0,<=2021.4.0 - - streamz - - dlpack - - arrow-cpp=1.0.1 - - arrow-cpp-proc * cuda - - boost-cpp>=1.72.0 - - double-conversion - - rapidjson - - flatbuffers - - hypothesis - - sphinx-markdown-tables - - sphinx-copybutton - - mimesis - - packaging - - protobuf - - nvtx>=0.2.1 - - cachetools - - pip: - - git+https://github.com/dask/dask.git@main - - git+https://github.com/dask/distributed.git@main - - git+https://github.com/python-streamz/streamz.git - - pyorc diff --git a/conda/environments/cudf_dev_cuda10.2.yml b/conda/environments/cudf_dev_cuda10.2.yml deleted file mode 100644 index cc78894a99c..00000000000 --- a/conda/environments/cudf_dev_cuda10.2.yml +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. - -name: cudf_dev -channels: - - rapidsai - - nvidia - - rapidsai-nightly - - conda-forge - - defaults -dependencies: - - clang=8.0.1 - - clang-tools=8.0.1 - - cupy>7.1.0,<9.0.0a0 - - rmm=0.20.* - - cmake>=3.14 - - cmake_setuptools>=0.1.3 - - python>=3.7,<3.9 - - numba>=0.49,!=0.51.0 - - numpy - - pandas>=1.0,<1.3.0dev0 - - pyarrow=1.0.1 - - fastavro>=0.22.9 - - notebook>=0.5.0 - - cython>=0.29,<0.30 - - fsspec>=0.6.0 - - pytest - - pytest-benchmark - - pytest-xdist - - sphinx - - sphinx_rtd_theme - - sphinxcontrib-websupport - - nbsphinx - - numpydoc - - ipython - - recommonmark - - pandoc=<2.0.0 - - cudatoolkit=10.2 - - pip - - flake8=3.8.3 - - black=19.10 - - isort=5.0.7 - - mypy=0.782 - - typing_extensions - - pre_commit - - dask==2021.4.0 - - distributed>=2.22.0,<=2021.4.0 - - streamz - - dlpack - - arrow-cpp=1.0.1 - - arrow-cpp-proc * cuda - - boost-cpp>=1.72.0 - - double-conversion - - rapidjson - - flatbuffers - - hypothesis - - sphinx-markdown-tables - - sphinx-copybutton - - mimesis - - packaging - - protobuf - - nvtx>=0.2.1 - - cachetools - - pip: - - git+https://github.com/dask/dask.git@main - - git+https://github.com/dask/distributed.git@main - - git+https://github.com/python-streamz/streamz.git - - pyorc diff --git a/cpp/benchmarks/string/json_benchmark.cpp b/cpp/benchmarks/string/json_benchmark.cpp index 6fb6a07a8d0..c6a6b757951 100644 --- a/cpp/benchmarks/string/json_benchmark.cpp +++ b/cpp/benchmarks/string/json_benchmark.cpp @@ -113,7 +113,7 @@ static void BM_case(benchmark::State& state, QueryArg&&... query_arg) std::string json_path(query_arg...); for (auto _ : state) { - cuda_event_timer raii(state, true, 0); + cuda_event_timer raii(state, true); auto result = cudf::strings::get_json_object(scv, json_path); cudaStreamSynchronize(0); } diff --git a/cpp/benchmarks/text/ngrams_benchmark.cpp b/cpp/benchmarks/text/ngrams_benchmark.cpp index 1fe8e3b7f2e..52f55249631 100644 --- a/cpp/benchmarks/text/ngrams_benchmark.cpp +++ b/cpp/benchmarks/text/ngrams_benchmark.cpp @@ -43,7 +43,7 @@ static void BM_ngrams(benchmark::State& state, ngrams_type nt) cudf::strings_column_view input(table->view().column(0)); for (auto _ : state) { - cuda_event_timer raii(state, true, 0); + cuda_event_timer raii(state, true); switch (nt) { case ngrams_type::tokens: nvtext::generate_ngrams(input); break; case ngrams_type::characters: nvtext::generate_character_ngrams(input); break; diff --git a/cpp/benchmarks/text/replace_benchmark.cpp b/cpp/benchmarks/text/replace_benchmark.cpp index f5428aee225..8f6704ab1af 100644 --- a/cpp/benchmarks/text/replace_benchmark.cpp +++ b/cpp/benchmarks/text/replace_benchmark.cpp @@ -54,7 +54,7 @@ static void BM_replace(benchmark::State& state) cudf::test::strings_column_wrapper replacements({"1", "2", "7", "0"}); for (auto _ : state) { - cuda_event_timer raii(state, true, 0); + cuda_event_timer raii(state, true); nvtext::replace_tokens( view, cudf::strings_column_view(targets), cudf::strings_column_view(replacements)); } diff --git a/cpp/docs/TESTING.md b/cpp/docs/TESTING.md index 638f7224ab8..2c7b62b8b6d 100644 --- a/cpp/docs/TESTING.md +++ b/cpp/docs/TESTING.md @@ -1,7 +1,7 @@ # Unit Testing in libcudf Unit tests in libcudf are written using -[Google Test](https://github.com/google/googletest/blob/master/googletest/docs/primer.md). +[Google Test](https://github.com/google/googletest/blob/master/docs/primer.md). **Important:** Instead of including `gtest/gtest.h` directly, use `#include `. @@ -59,7 +59,7 @@ files, and are therefore preferred in test code over `thrust::device_vector`. ## Base Fixture -All libcudf unit tests should make use of a GTest ["Test Fixture"](https://github.com/google/googletest/blob/master/googletest/docs/primer.md#test-fixtures-using-the-same-data-configuration-for-multiple-tests-same-data-multiple-tests). +All libcudf unit tests should make use of a GTest ["Test Fixture"](https://github.com/google/googletest/blob/master/docs/primer.md#test-fixtures-using-the-same-data-configuration-for-multiple-tests-same-data-multiple-tests). Even if the fixture is empty, it should inherit from the base fixture `cudf::test::BaseFixture` found in `include/cudf_test/base_fixture.hpp`. This ensures that RMM is properly initialized and finalized. `cudf::test::BaseFixture` already inherits from `::testing::Test` and therefore it is @@ -75,7 +75,7 @@ class MyTestFiture : public cudf::test::BaseFixture {...}; In general, libcudf features must work across all of the supported types (there are exceptions e.g. not all binary operations are supported for all types). In order to automate the process of running the same tests across multiple types, we use GTest's -[Typed Tests](https://github.com/google/googletest/blob/master/googletest/docs/advanced.md#typed-tests). +[Typed Tests](https://github.com/google/googletest/blob/master/docs/advanced.md#typed-tests). Typed tests allow you to write a test once and run it across a list of types. For example: diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 7cb3db1eb30..f2c57f3a9fa 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -389,9 +389,10 @@ class column_in_metadata { bool _use_int96_timestamp = false; // bool _output_as_binary = false; thrust::optional _decimal_precision; - std::vector children; public: + std::vector children; + /** * @brief Set the name of this column * diff --git a/cpp/src/io/comp/debrotli.cu b/cpp/src/io/comp/debrotli.cu index 953872ab7ed..541163eb086 100644 --- a/cpp/src/io/comp/debrotli.cu +++ b/cpp/src/io/comp/debrotli.cu @@ -357,8 +357,6 @@ static __device__ uint8_t *ext_heap_alloc(uint32_t bytes, first_free_block = atomicExch((unsigned int *)heap_ptr, first_free_block); if (first_free_block == ~0 || first_free_block >= ext_heap_size) { // Some other block is holding the heap or there are no free blocks: try again later - // Wait a bit in an attempt to make the spin less resource-hungry - nanosleep(100); continue; } if (first_free_block == 0) { @@ -408,8 +406,7 @@ static __device__ uint8_t *ext_heap_alloc(uint32_t bytes, } } while (blk_next != 0 && blk_next < ext_heap_size); first_free_block = atomicExch((unsigned int *)heap_ptr, first_free_block); - // Wait a while since reaching here means the heap is full - nanosleep(10000); + // Reaching here means the heap is full // Just in case we're trying to allocate more than the entire heap if (len > ext_heap_size - 4 * sizeof(uint32_t)) { break; } } @@ -429,8 +426,7 @@ static __device__ void ext_heap_free(void *ptr, for (;;) { first_free_block = atomicExch((unsigned int *)heap_ptr, first_free_block); if (first_free_block != ~0) { break; } - // Some other block is holding the heap: wait - nanosleep(50); + // Some other block is holding the heap } if (first_free_block >= ext_heap_size) { // Heap is currently empty diff --git a/cpp/src/io/comp/gpuinflate.cu b/cpp/src/io/comp/gpuinflate.cu index a31cf1717e7..eda1d37f78c 100644 --- a/cpp/src/io/comp/gpuinflate.cu +++ b/cpp/src/io/comp/gpuinflate.cu @@ -512,13 +512,10 @@ __device__ void decode_symbols(inflate_state_s *s) #if ENABLE_PREFETCH // Wait for prefetcher to fetch a worst-case of 48 bits per symbol while ((*(volatile int32_t *)&s->pref.cur_p - (int32_t)(size_t)cur < batch_size * 6) || - (s->x.batch_len[batch] != 0)) + (s->x.batch_len[batch] != 0)) {} #else - while (s->x.batch_len[batch] != 0) + while (s->x.batch_len[batch] != 0) {} #endif - { - nanosleep(100); - } batch_len = 0; #if ENABLE_PREFETCH if (cur + (bitpos >> 3) >= end) { @@ -662,7 +659,7 @@ __device__ void decode_symbols(inflate_state_s *s) if (batch_len != 0) batch = (batch + 1) & (batch_count - 1); } while (sym != 256); - while (s->x.batch_len[batch] != 0) { nanosleep(150); } + while (s->x.batch_len[batch] != 0) {} s->x.batch_len[batch] = -1; s->bitbuf = bitbuf; s->bitpos = bitpos; @@ -779,7 +776,7 @@ __device__ void process_symbols(inflate_state_s *s, int t) uint32_t lit_mask; if (t == 0) { - while ((batch_len = s->x.batch_len[batch]) == 0) { nanosleep(100); } + while ((batch_len = s->x.batch_len[batch]) == 0) {} } else { batch_len = 0; } @@ -962,8 +959,6 @@ __device__ void prefetch_warp(volatile inflate_state_s *s, int t) s->pref.cur_p = cur_p; __threadfence_block(); } - } else if (t == 0) { - nanosleep(150); } } } diff --git a/cpp/src/io/comp/unsnap.cu b/cpp/src/io/comp/unsnap.cu index 2b799b5e1bf..c58880c9ed8 100644 --- a/cpp/src/io/comp/unsnap.cu +++ b/cpp/src/io/comp/unsnap.cu @@ -99,7 +99,6 @@ __device__ void snappy_prefetch_bytestream(unsnap_state_s *s, int t) blen = 0; break; } - nanosleep(100); } } blen = shuffle(blen); @@ -281,7 +280,7 @@ __device__ void snappy_decode_symbols(unsnap_state_s *s, uint32_t t) if (t == 0) { s->q.prefetch_rdpos = cur; #pragma unroll(1) // We don't want unrolling here - while (s->q.prefetch_wrpos < min(cur + 5 * batch_size, end)) { nanosleep(50); } + while (s->q.prefetch_wrpos < min(cur + 5 * batch_size, end)) {} b = &s->q.batch[batch * batch_size]; } // Process small symbols in parallel: for data that does not get good compression, @@ -441,7 +440,7 @@ __device__ void snappy_decode_symbols(unsnap_state_s *s, uint32_t t) // Wait for prefetcher s->q.prefetch_rdpos = cur; #pragma unroll(1) // We don't want unrolling here - while (s->q.prefetch_wrpos < min(cur + 5 * batch_size, end)) { nanosleep(50); } + while (s->q.prefetch_wrpos < min(cur + 5 * batch_size, end)) {} dst_pos += blen; if (bytes_left < blen) break; bytes_left -= blen; @@ -457,7 +456,7 @@ __device__ void snappy_decode_symbols(unsnap_state_s *s, uint32_t t) } batch_len = shuffle(batch_len); if (t == 0) { - while (s->q.batch_len[batch] != 0) { nanosleep(100); } + while (s->q.batch_len[batch] != 0) {} } if (batch_len != batch_size) { break; } } @@ -490,7 +489,7 @@ __device__ void snappy_process_symbols(unsnap_state_s *s, int t, Storage &temp_s int32_t batch_len, blen_t, dist_t; if (t == 0) { - while ((batch_len = s->q.batch_len[batch]) == 0) { nanosleep(100); } + while ((batch_len = s->q.batch_len[batch]) == 0) {} } else { batch_len = 0; } diff --git a/cpp/src/io/utilities/block_utils.cuh b/cpp/src/io/utilities/block_utils.cuh index 4c03f9a9ca0..ec5177d70c3 100644 --- a/cpp/src/io/utilities/block_utils.cuh +++ b/cpp/src/io/utilities/block_utils.cuh @@ -36,16 +36,6 @@ inline __device__ void syncwarp(void) { __syncwarp(); } inline __device__ uint32_t ballot(int pred) { return __ballot_sync(~0, pred); } -template -inline __device__ void nanosleep(T d) -{ -#if (__CUDA_ARCH__ >= 700) - __nanosleep(d); -#else - clock(); -#endif -} - // Warp reduction helpers template inline __device__ T WarpReduceOr2(T acc) diff --git a/cpp/src/strings/json/json_path.cu b/cpp/src/strings/json/json_path.cu index cd8aae12070..3b0290736ae 100644 --- a/cpp/src/strings/json/json_path.cu +++ b/cpp/src/strings/json/json_path.cu @@ -945,7 +945,7 @@ std::unique_ptr get_json_object(cudf::strings_column_view const& c rmm::mr::device_memory_resource* mr) { CUDF_FUNC_RANGE(); - return detail::get_json_object(col, json_path, 0, mr); + return detail::get_json_object(col, json_path, rmm::cuda_stream_default, mr); } } // namespace strings diff --git a/cpp/tests/transform/row_bit_count_test.cu b/cpp/tests/transform/row_bit_count_test.cu index 21e5c818197..313113a58e0 100644 --- a/cpp/tests/transform/row_bit_count_test.cu +++ b/cpp/tests/transform/row_bit_count_test.cu @@ -16,8 +16,6 @@ #include #include -#include -#include #include #include #include @@ -47,7 +45,7 @@ TYPED_TEST(RowBitCountTyped, SimpleTypes) // expect size of the type per row auto expected = make_fixed_width_column(data_type{type_id::INT32}, 16); cudf::mutable_column_view mcv(*expected); - thrust::fill(rmm::exec_policy(0), + thrust::fill(rmm::exec_policy(rmm::cuda_stream_default), mcv.begin(), mcv.end(), sizeof(device_storage_type_t) * CHAR_BIT); @@ -70,7 +68,7 @@ TYPED_TEST(RowBitCountTyped, SimpleTypesWithNulls) // expect size of the type + 1 bit per row auto expected = make_fixed_width_column(data_type{type_id::INT32}, 16); cudf::mutable_column_view mcv(*expected); - thrust::fill(rmm::exec_policy(0), + thrust::fill(rmm::exec_policy(rmm::cuda_stream_default), mcv.begin(), mcv.end(), (sizeof(device_storage_type_t) * CHAR_BIT) + 1); @@ -490,7 +488,7 @@ TEST_F(RowBitCount, Table) auto expected = cudf::make_fixed_width_column(data_type{type_id::INT32}, t.num_rows()); cudf::mutable_column_view mcv(*expected); thrust::transform( - rmm::exec_policy(0), + rmm::exec_policy(rmm::cuda_stream_default), thrust::make_counting_iterator(0), thrust::make_counting_iterator(0) + t.num_rows(), mcv.begin(), @@ -586,7 +584,7 @@ TEST_F(RowBitCount, EmptyTable) } { - auto strings = cudf::strings::detail::make_empty_strings_column(0); + auto strings = cudf::make_empty_column(data_type{type_id::STRING}); auto ints = cudf::make_empty_column(data_type{type_id::INT32}); cudf::table_view empty({*strings, *ints}); diff --git a/java/src/main/java/ai/rapids/cudf/ParquetColumnWriterOptions.java b/java/src/main/java/ai/rapids/cudf/ParquetColumnWriterOptions.java new file mode 100644 index 00000000000..2787d65cfdc --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/ParquetColumnWriterOptions.java @@ -0,0 +1,423 @@ +/* + * + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package ai.rapids.cudf; + +import java.util.ArrayList; +import java.util.List; + +/** + * Per column settings for writing Parquet files. + */ +class ParquetColumnWriterOptions { + private boolean isTimestampTypeInt96; + private int precision; + private boolean isNullable; + private String columName; + private ParquetColumnWriterOptions(AbstractStructBuilder builder) { + this.columName = builder.name; + this.isNullable = builder.isNullable; + this.childColumnOptions = + (ParquetColumnWriterOptions[]) builder.children.toArray(new ParquetColumnWriterOptions[0]); + } + + /** + * Constructor used for list + */ + private ParquetColumnWriterOptions(ListBuilder builder) { + assert(builder.children.size() == 1) : "Lists can only have one child"; + this.columName = builder.name; + this.isNullable = builder.isNullable; + // we are adding the child twice even though lists have one child only because the way the cudf + // has implemented this it requires two children to be set for the list, but it drops the + // first one. This is something that is a lower priority and might be fixed in future + this.childColumnOptions = + new ParquetColumnWriterOptions[]{DUMMY_CHILD, builder.children.get(0)}; + } + + protected ParquetColumnWriterOptions[] childColumnOptions = {}; + protected abstract static class AbstractStructBuilder + extends NestedBuilder { + /** + * Builder specific to build a Struct meta + * @param name + */ + public AbstractStructBuilder(String name, boolean isNullable) { + super(name, isNullable); + } + + protected AbstractStructBuilder() { + super(); + } + } + + // This child is needed as the first child of a List column meta due to how cudf has been + // implemented. Cudf drops the first child from the meta if a column is a LIST. This is done + // this way due to some complications in the parquet reader. There was change to fix this here: + // https://github.com/rapidsai/cudf/pull/7461/commits/5ce33b40abb87cc7b76b5efeb0a3a0215f9ef6fb + // but it was reverted later on here: + // https://github.com/rapidsai/cudf/pull/7461/commits/f248eb7265de995a95f998d46d897fb0ae47f53e + static ParquetColumnWriterOptions DUMMY_CHILD = new ParquetColumnWriterOptions("DUMMY"); + + static abstract class NestedBuilder { + protected List children = new ArrayList<>(); + protected boolean isNullable = true; + protected String name = ""; + + /** + * Builder specific to build a Struct meta + * @param name + */ + protected NestedBuilder(String name, boolean isNullable) { + this.name = name; + this.isNullable = isNullable; + } + + protected NestedBuilder() {} + + protected ParquetColumnWriterOptions withColumn(String name, boolean isNullable) { + return new ParquetColumnWriterOptions(name, isNullable); + } + + protected ParquetColumnWriterOptions withDecimal(String name, int precision, + boolean isNullable) { + return new ParquetColumnWriterOptions(name, false, precision, isNullable); + } + + protected ParquetColumnWriterOptions withTimestamp(String name, boolean isInt96, + boolean isNullable) { + return new ParquetColumnWriterOptions(name, isInt96, 0, isNullable); + } + + /** + * Set the list column meta. + * Lists should have only one child in ColumnVector, but the metadata expects a + * LIST column to have two children and the first child to be the + * {@link ParquetColumnWriterOptions#DUMMY_CHILD}. + * This is the current behavior in cudf and will change in future + * @param child + * @return this for chaining. + */ + public T withListColumn(ParquetListColumnWriterOptions child) { + assert (child.getChildColumnOptions().length == 2) : "Lists can only have two children"; + if (child.getChildColumnOptions()[0] != DUMMY_CHILD) { + throw new IllegalArgumentException("First child in the list has to be DUMMY_CHILD"); + } + if (child.getChildColumnOptions()[1].getColumName().isEmpty()) { + throw new IllegalArgumentException("Column name can't be empty"); + } + children.add(child); + return (T) this; + } + + /** + * Set a child struct meta data + * @param child + * @return this for chaining. + */ + public T withStructColumn(ParquetStructColumnWriterOptions child) { + for (ParquetColumnWriterOptions opt: child.getChildColumnOptions()) { + if (opt.getColumName().isEmpty()) { + throw new IllegalArgumentException("Column name can't be empty"); + } + } + children.add(child); + return (T) this; + } + + /** + * Set column name + * @param name + */ + public T withNonNullableColumn(String... name) { + withColumn(false, name); + return (T) this; + } + + /** + * Set nullable column meta data + * @param name + */ + public T withNullableColumn(String... name) { + withColumn(true, name); + return (T) this; + } + + /** + * Set a simple child meta data + * @param name + * @return this for chaining. + */ + public T withColumn(boolean nullable, String... name) { + for (String n : name) { + children.add(withColumn(n, nullable)); + } + return (T) this; + } + + /** + * Set a Decimal child meta data + * @param name + * @param precision + * @return this for chaining. + */ + public T withDecimalColumn(String name, int precision, boolean nullable) { + children.add(withDecimal(name, precision, nullable)); + return (T) this; + } + + /** + * Set a Decimal child meta data + * @param name + * @param precision + * @return this for chaining. + */ + public T withNullableDecimalColumn(String name, int precision) { + withDecimalColumn(name, precision, true); + return (T) this; + } + + /** + * Set a Decimal child meta data + * @param name + * @param precision + * @return this for chaining. + */ + public T withDecimalColumn(String name, int precision) { + withDecimalColumn(name, precision, false); + return (T) this; + } + + /** + * Set a timestamp child meta data + * @param name + * @param isInt96 + * @return this for chaining. + */ + public T withTimestampColumn(String name, boolean isInt96, boolean nullable) { + children.add(withTimestamp(name, isInt96, nullable)); + return (T) this; + } + + /** + * Set a timestamp child meta data + * @param name + * @param isInt96 + * @return this for chaining. + */ + public T withTimestampColumn(String name, boolean isInt96) { + withTimestampColumn(name, isInt96, false); + return (T) this; + } + + /** + * Set a timestamp child meta data + * @param name + * @param isInt96 + * @return this for chaining. + */ + public T withNullableTimestampColumn(String name, boolean isInt96) { + withTimestampColumn(name, isInt96, true); + return (T) this; + } + } + + ParquetColumnWriterOptions(String columnName, boolean isTimestampTypeInt96, + int precision, boolean isNullable) { + this.isTimestampTypeInt96 = isTimestampTypeInt96; + this.precision = precision; + this.isNullable = isNullable; + this.columName = columnName; + } + + ParquetColumnWriterOptions(String columnName, boolean isNullable) { + this.isTimestampTypeInt96 = false; + this.precision = 0; + this.isNullable = isNullable; + this.columName = columnName; + } + + ParquetColumnWriterOptions(String columnName) { + this(columnName, true); + } + + boolean[] getFlatIsTimeTypeInt96() { + boolean[] ret = {isTimestampTypeInt96}; + + for (ParquetColumnWriterOptions opt: childColumnOptions) { + boolean[] b = opt.getFlatIsTimeTypeInt96(); + boolean[] tmp = new boolean[ret.length + b.length]; + System.arraycopy(ret, 0, tmp, 0, ret.length); + System.arraycopy(b, 0, tmp, ret.length, b.length); + ret = tmp; + } + return ret; + } + + int[] getFlatPrecision() { + int[] ret = {precision}; + + for (ParquetColumnWriterOptions opt: childColumnOptions) { + int[] b = opt.getFlatPrecision(); + int[] tmp = new int[ret.length + b.length]; + System.arraycopy(ret, 0, tmp, 0, ret.length); + System.arraycopy(b, 0, tmp, ret.length, b.length); + ret = tmp; + } + return ret; + } + + boolean[] getFlatIsNullable() { + boolean[] ret = {isNullable}; + + for (ParquetColumnWriterOptions opt: childColumnOptions) { + boolean[] b = opt.getFlatIsNullable(); + boolean[] tmp = new boolean[ret.length + b.length]; + System.arraycopy(ret, 0, tmp, 0, ret.length); + System.arraycopy(b, 0, tmp, ret.length, b.length); + ret = tmp; + } + return ret; + } + + int[] getFlatNumChildren() { + int[] ret = {childColumnOptions.length}; + + for (ParquetColumnWriterOptions opt: childColumnOptions) { + int[] b = opt.getFlatNumChildren(); + int[] tmp = new int[ret.length + b.length]; + System.arraycopy(ret, 0, tmp, 0, ret.length); + System.arraycopy(b, 0, tmp, ret.length, b.length); + ret = tmp; + } + return ret; + } + + String[] getFlatColumnNames() { + String[] ret = {columName}; + for (ParquetColumnWriterOptions opt: childColumnOptions) { + String[] b = opt.getFlatColumnNames(); + String[] tmp = new String[ret.length + b.length]; + System.arraycopy(ret, 0, tmp, 0, ret.length); + System.arraycopy(b, 0, tmp, ret.length, b.length); + ret = tmp; + } + return ret; + } + + /** + * Creates a ListBuilder for column called 'name' + * @param name + */ + public static ListBuilder listBuilder(String name) { + return new ListBuilder(name, true); + } + + /** + * Creates a ListBuilder for column called 'name' + * @param name + * @param isNullable + */ + public static ListBuilder listBuilder(String name, boolean isNullable) { + return new ListBuilder(name, isNullable); + } + + /** + * Creates a StructBuilder for column called 'name' + * @param name + * @param isNullable + */ + public static StructBuilder structBuilder(String name, boolean isNullable) { + return new StructBuilder(name, isNullable); + } + + /** + * Creates a StructBuilder for column called 'name' + * @param name + */ + public static StructBuilder structBuilder(String name) { + return new StructBuilder(name, true); + } + + /** + * Return if the column can have null values + */ + public String getColumName() { + return columName; + } + + /** + * Return if the column can have null values + */ + public boolean isNullable() { + return isNullable; + } + + /** + * Return the precision for this column + */ + public int getPrecision() { + return precision; + } + + /** + * Returns true if the writer is expected to write timestamps in INT96 + */ + public boolean isTimestampTypeInt96() { + return isTimestampTypeInt96; + } + + /** + * Return the child columnOptions for this column + */ + public ParquetColumnWriterOptions[] getChildColumnOptions() { + return childColumnOptions; + } + + public static class ParquetStructColumnWriterOptions extends ParquetColumnWriterOptions { + protected ParquetStructColumnWriterOptions(AbstractStructBuilder builder) { + super(builder); + } + } + + public static class ParquetListColumnWriterOptions extends ParquetColumnWriterOptions { + protected ParquetListColumnWriterOptions(ListBuilder builder) { + super(builder); + } + } + + public static class StructBuilder extends AbstractStructBuilder { + public StructBuilder(String name, boolean isNullable) { + super(name, isNullable); + } + + public ParquetStructColumnWriterOptions build() { + return new ParquetStructColumnWriterOptions(this); + } + } + + public static class ListBuilder extends NestedBuilder { + public ListBuilder(String name, boolean isNullable) { + super(name, isNullable); + } + + public ParquetListColumnWriterOptions build() { + return new ParquetListColumnWriterOptions(this); + } + } +} diff --git a/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java b/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java index 2e793494b7b..c0882f54251 100644 --- a/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java +++ b/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java @@ -18,10 +18,33 @@ package ai.rapids.cudf; +import java.util.LinkedHashMap; +import java.util.Map; + /** - * Settings for writing Parquet files. + * This class represents settings for writing Parquet files. It includes meta data information + * that will be used by the Parquet writer to write the file */ -public class ParquetWriterOptions extends CompressedMetadataWriterOptions { +public final class ParquetWriterOptions extends ParquetColumnWriterOptions.ParquetStructColumnWriterOptions { + private final CompressionType compressionType; + private final Map metadata; + private final StatisticsFrequency statsGranularity; + + private ParquetWriterOptions(Builder builder) { + super(builder); + this.statsGranularity = builder.statsGranularity; + this.compressionType = builder.compressionType; + this.metadata = builder.metadata; + } + + String[] getMetadataKeys() { + return metadata.keySet().toArray(new String[metadata.size()]); + } + + String[] getMetadataValues() { + return metadata.values().toArray(new String[metadata.size()]); + } + public enum StatisticsFrequency { /** Do not generate statistics */ NONE(0), @@ -39,32 +62,61 @@ public enum StatisticsFrequency { } } - public static class Builder extends CMWriterBuilder { + public static Builder builder() { + return new Builder(); + } + + public StatisticsFrequency getStatisticsFrequency() { + return statsGranularity; + } + + public CompressionType getCompressionType() { + return compressionType; + } + + public Map getMetadata() { + return metadata; + } + + public static class Builder extends ParquetColumnWriterOptions.AbstractStructBuilder { private StatisticsFrequency statsGranularity = StatisticsFrequency.ROWGROUP; - private boolean isTimestampTypeInt96 = false; - private int[] precisionValues = null; + final Map metadata = new LinkedHashMap<>(); + CompressionType compressionType = CompressionType.AUTO; - public Builder withStatisticsFrequency(StatisticsFrequency statsGranularity) { - this.statsGranularity = statsGranularity; + public Builder() { + super(); + } + + /** + * Add a metadata key and a value + * @param key + * @param value + */ + public Builder withMetadata(String key, String value) { + this.metadata.put(key, value); return this; } /** - * Set whether the timestamps should be written in INT96 + * Add a map of metadata keys and values + * @param metadata */ - public Builder withTimestampInt96(boolean int96) { - this.isTimestampTypeInt96 = int96; + public Builder withMetadata(Map metadata) { + this.metadata.putAll(metadata); return this; } /** - * This is a temporary hack to make things work. This API will go away once we can update the - * parquet APIs properly. - * @param precisionValues a value for each column, non-decimal columns are ignored. - * @return this for chaining. + * Set the compression type to use for writing + * @param compression */ - public Builder withDecimalPrecisions(int ... precisionValues) { - this.precisionValues = precisionValues; + public Builder withCompressionType(CompressionType compression) { + this.compressionType = compression; + return this; + } + + public Builder withStatisticsFrequency(StatisticsFrequency statsGranularity) { + this.statsGranularity = statsGranularity; return this; } @@ -72,40 +124,4 @@ public ParquetWriterOptions build() { return new ParquetWriterOptions(this); } } - - public static Builder builder() { - return new Builder(); - } - - private final StatisticsFrequency statsGranularity; - - private ParquetWriterOptions(Builder builder) { - super(builder); - this.statsGranularity = builder.statsGranularity; - this.isTimestampTypeInt96 = builder.isTimestampTypeInt96; - this.precisions = builder.precisionValues; - } - - public StatisticsFrequency getStatisticsFrequency() { - return statsGranularity; - } - - /** - * Return the flattened list of precisions if set otherwise empty array will be returned. - * For a definition of what `flattened` means please look at {@link Builder#withDecimalPrecisions} - */ - public int[] getPrecisions() { - return precisions; - } - - /** - * Returns true if the writer is expected to write timestamps in INT96 - */ - public boolean isTimestampTypeInt96() { - return isTimestampTypeInt96; - } - - private boolean isTimestampTypeInt96; - - private int[] precisions; } diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java index 8f256987dd2..5b17621cb42 100644 --- a/java/src/main/java/ai/rapids/cudf/Table.java +++ b/java/src/main/java/ai/rapids/cudf/Table.java @@ -244,6 +244,7 @@ private static native long[] readParquet(String[] filterColumnNames, String file /** * Setup everything to write parquet formatted data to a file. * @param columnNames names that correspond to the table columns + * @param flatNumChildren flattened list of children per column * @param nullable true if the column can have nulls else false * @param metadataKeys Metadata key names to place in the Parquet file * @param metadataValues Metadata values corresponding to metadataKeys @@ -256,18 +257,20 @@ private static native long[] readParquet(String[] filterColumnNames, String file * @return a handle that is used in later calls to writeParquetChunk and writeParquetEnd. */ private static native long writeParquetFileBegin(String[] columnNames, + int[] flatNumChildren, boolean[] nullable, String[] metadataKeys, String[] metadataValues, int compression, int statsFreq, - boolean isInt96, + boolean[] isInt96, int[] precisions, String filename) throws CudfException; /** * Setup everything to write parquet formatted data to a buffer. * @param columnNames names that correspond to the table columns + * @param flatNumChildren flattened list of children per column * @param nullable true if the column can have nulls else false * @param metadataKeys Metadata key names to place in the Parquet file * @param metadataValues Metadata values corresponding to metadataKeys @@ -280,12 +283,13 @@ private static native long writeParquetFileBegin(String[] columnNames, * @return a handle that is used in later calls to writeParquetChunk and writeParquetEnd. */ private static native long writeParquetBufferBegin(String[] columnNames, + int[] flatNumChildren, boolean[] nullable, String[] metadataKeys, String[] metadataValues, int compression, int statsFreq, - boolean isInt96, + boolean[] isInt96, int[] precisions, HostBufferConsumer consumer) throws CudfException; @@ -819,35 +823,43 @@ private static class ParquetTableWriter implements TableWriter { HostBufferConsumer consumer; private ParquetTableWriter(ParquetWriterOptions options, File outputFile) { - int numColumns = options.getColumnNames().length; - assert (numColumns == options.getColumnNullability().length); - int[] precisions = options.getPrecisions(); - if (precisions != null) { - assert (numColumns >= options.getPrecisions().length); - } + String[] columnNames = options.getFlatColumnNames(); + boolean[] columnNullabilities = options.getFlatIsNullable(); + boolean[] timeInt96Values = options.getFlatIsTimeTypeInt96(); + int[] precisions = options.getFlatPrecision(); + int[] flatNumChildren = options.getFlatNumChildren(); + this.consumer = null; - this.handle = writeParquetFileBegin(options.getColumnNames(), - options.getColumnNullability(), + this.handle = writeParquetFileBegin(columnNames, + flatNumChildren, + columnNullabilities, options.getMetadataKeys(), options.getMetadataValues(), options.getCompressionType().nativeId, options.getStatisticsFrequency().nativeId, - options.isTimestampTypeInt96(), - options.getPrecisions(), + timeInt96Values, + precisions, outputFile.getAbsolutePath()); } private ParquetTableWriter(ParquetWriterOptions options, HostBufferConsumer consumer) { - this.handle = writeParquetBufferBegin(options.getColumnNames(), - options.getColumnNullability(), + String[] columnNames = options.getFlatColumnNames(); + boolean[] columnNullabilities = options.getFlatIsNullable(); + boolean[] timeInt96Values = options.getFlatIsTimeTypeInt96(); + int[] precisions = options.getFlatPrecision(); + int[] flatNumChildren = options.getFlatNumChildren(); + + this.consumer = consumer; + this.handle = writeParquetBufferBegin(columnNames, + flatNumChildren, + columnNullabilities, options.getMetadataKeys(), options.getMetadataValues(), options.getCompressionType().nativeId, options.getStatisticsFrequency().nativeId, - options.isTimestampTypeInt96(), - options.getPrecisions(), + timeInt96Values, + precisions, consumer); - this.consumer = consumer; } @Override diff --git a/java/src/main/native/src/TableJni.cpp b/java/src/main/native/src/TableJni.cpp index 346ae8435cc..7d981229906 100644 --- a/java/src/main/native/src/TableJni.cpp +++ b/java/src/main/native/src/TableJni.cpp @@ -37,9 +37,11 @@ #include #include #include +#include "cudf/types.hpp" #include "cudf_jni_apis.hpp" #include "dtype_utils.hpp" +#include "jni_utils.hpp" #include "row_conversion.hpp" #include @@ -1068,56 +1070,102 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readParquet( CATCH_STD(env, NULL); } +int set_column_metadata(cudf::io::column_in_metadata &column_metadata, + std::vector &col_names, + cudf::jni::native_jbooleanArray &nullability, + cudf::jni::native_jbooleanArray &isInt96, + cudf::jni::native_jintArray &precisions, + cudf::jni::native_jintArray &children, int read_index) { + int write_index = 0; + int num_children = children[read_index++]; + column_metadata.children.resize(num_children); + for (int i = 0 ; i < num_children; i++, write_index++) { + column_metadata.child(write_index) + .set_name(col_names[read_index]) + .set_decimal_precision(precisions[read_index]) + .set_int96_timestamps(isInt96[read_index]) + .set_nullability(nullability[read_index]); + if (children[read_index] > 0) { + read_index = set_column_metadata(column_metadata.child(write_index), col_names, nullability, + isInt96, precisions, children, read_index); + } + else { + read_index++; + } + } + return read_index; +} + +void createTableMetaData(JNIEnv *env, jobjectArray &j_col_names, jintArray &j_children, + jbooleanArray &j_col_nullability, jobjectArray &j_metadata_keys, + jobjectArray &j_metadata_values, jint j_compression, jint j_stats_freq, + jbooleanArray &j_isInt96, jintArray &j_precisions, + cudf::io::table_input_metadata& metadata) { + cudf::jni::auto_set_device(env); + cudf::jni::native_jstringArray col_names(env, j_col_names); + cudf::jni::native_jbooleanArray col_nullability(env, j_col_nullability); + cudf::jni::native_jbooleanArray isInt96(env, j_isInt96); + cudf::jni::native_jstringArray meta_keys(env, j_metadata_keys); + cudf::jni::native_jstringArray meta_values(env, j_metadata_values); + cudf::jni::native_jintArray precisions(env, j_precisions); + cudf::jni::native_jintArray children(env, j_children); + + auto cpp_names = col_names.as_cpp_vector(); + + int top_level_children = + children[0]; // this should never be 0, because a table must have columns + + // first value are dummy when reading + // but we write at index 0 + metadata.column_metadata.resize(top_level_children); + int read_index = 1; // the read_index, which will be used to read the arrays + for (int i = read_index, write_index = 0; i <= top_level_children; i++, write_index++) { + metadata.column_metadata[write_index] + .set_name(cpp_names[read_index]) + .set_nullability(col_nullability[read_index]) + .set_int96_timestamps(isInt96[read_index]) + .set_decimal_precision(precisions[read_index]); + + if (children[read_index] > 0) { + read_index = set_column_metadata(metadata.column_metadata[write_index], cpp_names, + col_nullability, isInt96, precisions, children, read_index); + } else { + read_index++; + } + } + for (auto i = 0; i < meta_keys.size(); ++i) { + metadata.user_data[meta_keys[i].get()] = meta_values[i].get(); + } + +} + JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeParquetBufferBegin( - JNIEnv *env, jclass, jobjectArray j_col_names, jbooleanArray j_col_nullability, - jobjectArray j_metadata_keys, jobjectArray j_metadata_values, jint j_compression, - jint j_stats_freq, jboolean j_isInt96, jintArray j_precisions, jobject consumer) { + JNIEnv *env, jclass, jobjectArray j_col_names, jintArray j_children, + jbooleanArray j_col_nullability, jobjectArray j_metadata_keys, jobjectArray j_metadata_values, + jint j_compression, jint j_stats_freq, jbooleanArray j_isInt96, jintArray j_precisions, + jobject consumer) { JNI_NULL_CHECK(env, j_col_names, "null columns", 0); JNI_NULL_CHECK(env, j_col_nullability, "null nullability", 0); JNI_NULL_CHECK(env, j_metadata_keys, "null metadata keys", 0); JNI_NULL_CHECK(env, j_metadata_values, "null metadata values", 0); JNI_NULL_CHECK(env, consumer, "null consumer", 0); try { - cudf::jni::auto_set_device(env); - using namespace cudf::io; - cudf::jni::native_jstringArray col_names(env, j_col_names); - cudf::jni::native_jbooleanArray col_nullability(env, j_col_nullability); - cudf::jni::native_jstringArray meta_keys(env, j_metadata_keys); - cudf::jni::native_jstringArray meta_values(env, j_metadata_values); - cudf::jni::native_jintArray precisions(env, j_precisions); - - auto cpp_names = col_names.as_cpp_vector(); - table_input_metadata metadata; - metadata.column_metadata.resize(col_nullability.size()); - for (int i = 0; i < col_nullability.size(); i++) { - metadata.column_metadata[i] - .set_name(cpp_names[i]) - .set_nullability(col_nullability[i]) - .set_int96_timestamps(j_isInt96); - } - - // Precisions is not always set - for (int i = 0; i < precisions.size(); i++) { - metadata.column_metadata[i] - .set_decimal_precision(precisions[i]); - } - - for (auto i = 0; i < meta_keys.size(); ++i) { - metadata.user_data[meta_keys[i].get()] = meta_values[i].get(); - } - std::unique_ptr data_sink( new cudf::jni::jni_writer_data_sink(env, consumer)); + + using namespace cudf::io; sink_info sink{data_sink.get()}; - std::vector const v_precisions( - precisions.data(), precisions.data() + precisions.size()); + table_input_metadata metadata; + createTableMetaData(env, j_col_names, j_children, j_col_nullability, j_metadata_keys, + j_metadata_values, j_compression, j_stats_freq, j_isInt96, j_precisions, + metadata); + chunked_parquet_writer_options opts = chunked_parquet_writer_options::builder(sink) .metadata(&metadata) .compression(static_cast(j_compression)) .stats_level(static_cast(j_stats_freq)) .build(); - auto writer_ptr = std::make_unique(opts); cudf::jni::native_parquet_writer_handle *ret = new cudf::jni::native_parquet_writer_handle(std::move(writer_ptr), std::move(data_sink)); @@ -1127,44 +1175,22 @@ JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeParquetBufferBegin( } JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeParquetFileBegin( - JNIEnv *env, jclass, jobjectArray j_col_names, jbooleanArray j_col_nullability, - jobjectArray j_metadata_keys, jobjectArray j_metadata_values, jint j_compression, - jint j_stats_freq, jboolean j_isInt96, jintArray j_precisions, jstring j_output_path) { + JNIEnv *env, jclass, jobjectArray j_col_names, jintArray j_children, + jbooleanArray j_col_nullability, jobjectArray j_metadata_keys, jobjectArray j_metadata_values, + jint j_compression, jint j_stats_freq, jbooleanArray j_isInt96, jintArray j_precisions, + jstring j_output_path) { JNI_NULL_CHECK(env, j_col_names, "null columns", 0); JNI_NULL_CHECK(env, j_col_nullability, "null nullability", 0); JNI_NULL_CHECK(env, j_metadata_keys, "null metadata keys", 0); JNI_NULL_CHECK(env, j_metadata_values, "null metadata values", 0); JNI_NULL_CHECK(env, j_output_path, "null output path", 0); try { - cudf::jni::auto_set_device(env); - using namespace cudf::io; - cudf::jni::native_jstringArray col_names(env, j_col_names); - cudf::jni::native_jbooleanArray col_nullability(env, j_col_nullability); - cudf::jni::native_jstringArray meta_keys(env, j_metadata_keys); - cudf::jni::native_jstringArray meta_values(env, j_metadata_values); cudf::jni::native_jstring output_path(env, j_output_path); - cudf::jni::native_jintArray precisions(env, j_precisions); - auto cpp_names = col_names.as_cpp_vector(); + using namespace cudf::io; table_input_metadata metadata; - metadata.column_metadata.resize(col_nullability.size()); - for (int i = 0; i < col_nullability.size(); i++) { - metadata.column_metadata[i] - .set_name(cpp_names[i]) - .set_nullability(col_nullability[i]) - .set_int96_timestamps(j_isInt96); - } - - // Precisions is not always set - for (int i = 0; i < precisions.size(); i++) { - metadata.column_metadata[i] - .set_decimal_precision(precisions[i]); - } - - for (auto i = 0; i < meta_keys.size(); ++i) { - metadata.user_data[meta_keys[i].get()] = meta_values[i].get(); - } - + createTableMetaData(env, j_col_names, j_children, j_col_nullability, j_metadata_keys, + j_metadata_values, j_compression, j_stats_freq, j_isInt96, j_precisions, metadata); sink_info sink{output_path.get()}; chunked_parquet_writer_options opts = chunked_parquet_writer_options::builder(sink) @@ -1833,8 +1859,7 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_crossJoin(JNIEnv *env, jc JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Table_interleaveColumns(JNIEnv *env, jclass, jlongArray j_cudf_table_view) { - JNI_NULL_CHECK(env, j_cudf_table_view, "table is null", 0); - try { + JNI_NULL_CHECK(env, j_cudf_table_view, "table is null", 0); try { cudf::jni::auto_set_device(env); cudf::table_view *table_view = reinterpret_cast(j_cudf_table_view); std::unique_ptr result = cudf::interleave_columns(*table_view); diff --git a/java/src/main/native/src/row_conversion.cu b/java/src/main/native/src/row_conversion.cu index a10ba9a2700..a0938ddb2b5 100644 --- a/java/src/main/native/src/row_conversion.cu +++ b/java/src/main/native/src/row_conversion.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -404,7 +404,7 @@ static std::unique_ptr fixed_width_convert_to_rows( input_data->data(), input_nm->data(), data->mutable_view().data()); return cudf::make_lists_column(num_rows, std::move(offsets), std::move(data), 0, - rmm::device_buffer{0, 0, mr}, stream, mr); + rmm::device_buffer{0, rmm::cuda_stream_default, mr}, stream, mr); } static cudf::data_type get_data_type(const cudf::column_view &v) { diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index 8b7ece5d60b..a047158d977 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -50,6 +50,9 @@ import static ai.rapids.cudf.Aggregate.max; import static ai.rapids.cudf.Aggregate.first; import static ai.rapids.cudf.Aggregate.last; +import static ai.rapids.cudf.ParquetColumnWriterOptions.*; +import static ai.rapids.cudf.ParquetWriterOptions.listBuilder; +import static ai.rapids.cudf.ParquetWriterOptions.structBuilder; import static ai.rapids.cudf.Table.TestBuilder; import static ai.rapids.cudf.Table.count; import static ai.rapids.cudf.Table.mean; @@ -4545,36 +4548,42 @@ void testTableBasedFilter() { } private Table getExpectedFileTable() { - return getExpectedFileTable(false); + return getExpectedFileTable(false, false); } private Table getExpectedFileTable(boolean withNestedColumns) { + return getExpectedFileTable(true, true); + } + + private Table getExpectedFileTable(boolean withStructColumns, boolean withListColumn) { TestBuilder tb = new TestBuilder() - .column(true, false, false, true, false) - .column(5, 1, 0, 2, 7) - .column(new Byte[]{2, 3, 4, 5, 9}) - .column(3l, 9l, 4l, 2l, 20l) - .column("this", "is", "a", "test", "string") - .column(1.0f, 3.5f, 5.9f, 7.1f, 9.8f) - .column(5.0d, 9.5d, 0.9d, 7.23d, 2.8d); - if (withNestedColumns) { - StructType nestedType = new StructType(true, - new BasicType(false, DType.INT32), new BasicType(false, DType.STRING)); + .column(true, false, false, true, false) + .column(5, 1, 0, 2, 7) + .column(new Byte[]{2, 3, 4, 5, 9}) + .column(3l, 9l, 4l, 2l, 20l) + .column("this", "is", "a", "test", "string") + .column(1.0f, 3.5f, 5.9f, 7.1f, 9.8f) + .column(5.0d, 9.5d, 0.9d, 7.23d, 2.8d); + StructType nestedType = new StructType(true, + new BasicType(false, DType.INT32), new BasicType(false, DType.STRING)); + if (withStructColumns) { tb.column(nestedType, - struct(1, "k1"), struct(2, "k2"), struct(3, "k3"), - struct(4, "k4"), new HostColumnVector.StructData((List) null)) - .column(new ListType(false, new BasicType(false, DType.INT32)), - Arrays.asList(1, 2), - Arrays.asList(3, 4), - Arrays.asList(5), - Arrays.asList(6, 7), - Arrays.asList(8, 9, 10)) - .column(new ListType(false, nestedType), - Arrays.asList(struct(1, "k1"), struct(2, "k2"), struct(3, "k3")), - Arrays.asList(struct(4, "k4"), struct(5, "k5")), - Arrays.asList(struct(6, "k6")), - Arrays.asList(new HostColumnVector.StructData((List) null)), - Arrays.asList()); + struct(1, "k1"), struct(2, "k2"), struct(3, "k3"), + struct(4, "k4"), new HostColumnVector.StructData((List) null)); + } + if (withListColumn) { + tb.column(new ListType(false, new BasicType(false, DType.INT32)), + Arrays.asList(1, 2), + Arrays.asList(3, 4), + Arrays.asList(5), + Arrays.asList(6, 7), + Arrays.asList(8, 9, 10)) + .column(new ListType(false, nestedType), + Arrays.asList(struct(1, "k1"), struct(2, "k2"), struct(3, "k3")), + Arrays.asList(struct(4, "k4"), struct(5, "k5")), + Arrays.asList(struct(6, "k6")), + Arrays.asList(new HostColumnVector.StructData((List) null)), + Arrays.asList()); } return tb.build(); } @@ -4642,9 +4651,9 @@ void testParquetWriteToBufferChunkedInt96() { try (Table table0 = getExpectedFileTableWithDecimals(); MyBufferConsumer consumer = new MyBufferConsumer()) { ParquetWriterOptions options = ParquetWriterOptions.builder() - .withColumnNames("_c1", "_c2", "_c3", "_c4", "_c5", "_c6", "_c7", "_c8", "_c9") - .withTimestampInt96(true) - .withDecimalPrecisions(0, 0, 0, 0, 0, 0, 0, 5, 5) + .withNonNullableColumn("_c0", "_c1", "_c2", "_c3", "_c4", "_c5", "_c6") + .withDecimalColumn("_c7", 5) + .withDecimalColumn("_c8", 5) .build(); try (TableWriter writer = Table.writeParquetChunked(options, consumer)) { @@ -4659,13 +4668,47 @@ void testParquetWriteToBufferChunkedInt96() { } } + @Test + void testParquetWriteToBufferChunkedWithNested() { + ParquetWriterOptions options = ParquetWriterOptions.builder() + .withNullableColumn("_c0", "_c1", "_c2", "_c3", "_c4", "_c5", "_c6") + .withStructColumn(structBuilder("_c7") + .withNullableColumn("_c7-1") + .withNullableColumn("_c7-2") + .build()) + .withListColumn(listBuilder("_c8") + .withNullableColumn("c8-1").build()) + .withListColumn(listBuilder("c9") + .withStructColumn(structBuilder("c9-1") + .withNullableColumn("c9-1-1") + .withNullableColumn("c9-1-2").build()) + .build()) + .build(); + try (Table table0 = getExpectedFileTable(true); + MyBufferConsumer consumer = new MyBufferConsumer()) { + try (TableWriter writer = Table.writeParquetChunked(options, consumer)) { + writer.write(table0); + writer.write(table0); + writer.write(table0); + } + try (Table table1 = Table.readParquet(ParquetOptions.DEFAULT, consumer.buffer, 0, + consumer.offset); + Table concat = Table.concatenate(table0, table0, table0)) { + assertTablesAreEqual(concat, table1); + } + } + } + @Test void testParquetWriteToBufferChunked() { ParquetWriterOptions options = ParquetWriterOptions.builder() - .withColumnNames("_c1", "_c2", "_c3", "_c4", "_c5", "_c6", "_c7") - .withTimestampInt96(true) + .withNullableColumn("_c0", "_c1", "_c2", "_c3", "_c4", "_c5", "_c6") + .withStructColumn(structBuilder("_c7") + .withNullableColumn("_c7-1") + .withNullableColumn("_c7-2") + .build()) .build(); - try (Table table0 = getExpectedFileTable(); + try (Table table0 = getExpectedFileTable(true, false); MyBufferConsumer consumer = new MyBufferConsumer()) { try (TableWriter writer = Table.writeParquetChunked(options, consumer)) { writer.write(table0); @@ -4684,11 +4727,11 @@ void testParquetWriteToFileWithNames() throws IOException { File tempFile = File.createTempFile("test-names", ".parquet"); try (Table table0 = getExpectedFileTableWithDecimals()) { ParquetWriterOptions options = ParquetWriterOptions.builder() - .withColumnNames("first", "second", "third", "fourth", "fifth", "sixth", "seventh", - "eighth", "nineth") + .withNonNullableColumn("first", "second", "third", "fourth", "fifth", "sixth", "seventh") + .withDecimalColumn("eighth", 5) + .withDecimalColumn("nineth", 6) .withCompressionType(CompressionType.NONE) .withStatisticsFrequency(ParquetWriterOptions.StatisticsFrequency.NONE) - .withDecimalPrecisions(0, 0, 0, 0, 0, 0, 0, 5, 6) .build(); try (TableWriter writer = Table.writeParquetChunked(options, tempFile.getAbsoluteFile())) { writer.write(table0); @@ -4706,12 +4749,12 @@ void testParquetWriteToFileWithNamesAndMetadata() throws IOException { File tempFile = File.createTempFile("test-names-metadata", ".parquet"); try (Table table0 = getExpectedFileTableWithDecimals()) { ParquetWriterOptions options = ParquetWriterOptions.builder() - .withColumnNames("first", "second", "third", "fourth", "fifth", "sixth", "seventh", - "eighth", "nineth") + .withNonNullableColumn("first", "second", "third", "fourth", "fifth", "sixth", "seventh") + .withDecimalColumn("eighth", 6) + .withDecimalColumn("nineth", 8) .withMetadata("somekey", "somevalue") .withCompressionType(CompressionType.NONE) .withStatisticsFrequency(ParquetWriterOptions.StatisticsFrequency.NONE) - .withDecimalPrecisions(0, 0, 0, 0, 0, 0, 0, 6, 8) .build(); try (TableWriter writer = Table.writeParquetChunked(options, tempFile.getAbsoluteFile())) { writer.write(table0); @@ -4729,10 +4772,11 @@ void testParquetWriteToFileUncompressedNoStats() throws IOException { File tempFile = File.createTempFile("test-uncompressed", ".parquet"); try (Table table0 = getExpectedFileTableWithDecimals()) { ParquetWriterOptions options = ParquetWriterOptions.builder() - .withColumnNames("_c1", "_c2", "_c3", "_c4", "_c5", "_c6", "_c7", "_c8", "_c9") + .withNonNullableColumn("_c0", "_c1", "_c2", "_c3", "_c4", "_c5", "_c6") + .withDecimalColumn("_c7", 4) + .withDecimalColumn("_c8", 6) .withCompressionType(CompressionType.NONE) .withStatisticsFrequency(ParquetWriterOptions.StatisticsFrequency.NONE) - .withDecimalPrecisions(0, 0, 0, 0, 0, 0, 0, 4, 6) .build(); try (TableWriter writer = Table.writeParquetChunked(options, tempFile.getAbsoluteFile())) { writer.write(table0); diff --git a/python/cudf/cudf/_lib/aggregation.pxd b/python/cudf/cudf/_lib/aggregation.pxd index bb332c44237..972f95d5aab 100644 --- a/python/cudf/cudf/_lib/aggregation.pxd +++ b/python/cudf/cudf/_lib/aggregation.pxd @@ -4,7 +4,7 @@ from libcpp.memory cimport unique_ptr from cudf._lib.cpp.aggregation cimport aggregation -cdef unique_ptr[aggregation] make_aggregation(op, kwargs=*) except * - cdef class Aggregation: cdef unique_ptr[aggregation] c_obj + +cdef Aggregation make_aggregation(op, kwargs=*) diff --git a/python/cudf/cudf/_lib/aggregation.pyx b/python/cudf/cudf/_lib/aggregation.pyx index 7138bb49743..682d8cbf329 100644 --- a/python/cudf/cudf/_lib/aggregation.pyx +++ b/python/cudf/cudf/_lib/aggregation.pyx @@ -56,85 +56,55 @@ class AggregationKind(Enum): cdef class Aggregation: - def __init__(self, op, **kwargs): - self.c_obj = move(make_aggregation(op, kwargs)) - + """A Cython wrapper for aggregations. + + **This class should never be instantiated using a standard constructor, + only using one of its many factories.** These factories handle mapping + different cudf operations to their libcudf analogs, e.g. + `cudf.DataFrame.idxmin` -> `libcudf.argmin`. Additionally, they perform + any additional configuration needed to translate Python arguments into + their corresponding C++ types (for instance, C++ enumerations used for + flag arguments). The factory approach is necessary to support operations + like `df.agg(lambda x: x.sum())`; such functions are called with this + class as an argument to generation the desired aggregation. + """ @property def kind(self): - return AggregationKind(self.c_obj.get()[0].kind).name.lower() - - -cdef unique_ptr[aggregation] make_aggregation(op, kwargs={}) except *: - """ - Parameters - ---------- - op : str or callable - If callable, must meet one of the following requirements: - - * Is of the form lambda x: x.agg(*args, **kwargs), where - `agg` is the name of a supported aggregation. Used to - to specify aggregations that take arguments, e.g., - `lambda x: x.quantile(0.5)`. - * Is a user defined aggregation function that operates on - group values. In this case, the output dtype must be - specified in the `kwargs` dictionary. - - Returns - ------- - unique_ptr[aggregation] - """ - cdef Aggregation agg - if isinstance(op, str): - agg = getattr(_AggregationFactory, op)(**kwargs) - elif callable(op): - if op is list: - agg = _AggregationFactory.collect() - elif "dtype" in kwargs: - agg = _AggregationFactory.from_udf(op, **kwargs) - else: - agg = op(_AggregationFactory) - else: - raise TypeError("Unknown aggregation {}".format(op)) - return move(agg.c_obj) - -# The Cython pattern below enables us to create an Aggregation -# without ever calling its `__init__` method, which would otherwise -# result in a RecursionError. -cdef class _AggregationFactory: + return AggregationKind(self.c_obj.get()[0].kind).name @classmethod def sum(cls): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move(libcudf_aggregation.make_sum_aggregation()) return agg @classmethod def min(cls): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move(libcudf_aggregation.make_min_aggregation()) return agg @classmethod def max(cls): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move(libcudf_aggregation.make_max_aggregation()) return agg @classmethod def idxmin(cls): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move(libcudf_aggregation.make_argmin_aggregation()) return agg @classmethod def idxmax(cls): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move(libcudf_aggregation.make_argmax_aggregation()) return agg @classmethod def mean(cls): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move(libcudf_aggregation.make_mean_aggregation()) return agg @@ -146,7 +116,7 @@ cdef class _AggregationFactory: else: c_null_handling = libcudf_types.null_policy.INCLUDE - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move(libcudf_aggregation.make_count_aggregation( c_null_handling )) @@ -154,7 +124,7 @@ cdef class _AggregationFactory: @classmethod def size(cls): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move(libcudf_aggregation.make_count_aggregation( ( NullHandling.INCLUDE @@ -164,13 +134,13 @@ cdef class _AggregationFactory: @classmethod def nunique(cls): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move(libcudf_aggregation.make_nunique_aggregation()) return agg @classmethod def nth(cls, libcudf_types.size_type size): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move( libcudf_aggregation.make_nth_element_aggregation(size) ) @@ -178,49 +148,49 @@ cdef class _AggregationFactory: @classmethod def any(cls): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move(libcudf_aggregation.make_any_aggregation()) return agg @classmethod def all(cls): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move(libcudf_aggregation.make_all_aggregation()) return agg @classmethod def product(cls): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move(libcudf_aggregation.make_product_aggregation()) return agg @classmethod def sum_of_squares(cls): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move(libcudf_aggregation.make_sum_of_squares_aggregation()) return agg @classmethod def var(cls, ddof=1): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move(libcudf_aggregation.make_variance_aggregation(ddof)) return agg @classmethod def std(cls, ddof=1): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move(libcudf_aggregation.make_std_aggregation(ddof)) return agg @classmethod def median(cls): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move(libcudf_aggregation.make_median_aggregation()) return agg @classmethod def quantile(cls, q=0.5, interpolation="linear"): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() if not pd.api.types.is_list_like(q): q = [q] @@ -240,19 +210,19 @@ cdef class _AggregationFactory: @classmethod def collect(cls): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move(libcudf_aggregation.make_collect_list_aggregation()) return agg @classmethod def unique(cls): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() agg.c_obj = move(libcudf_aggregation.make_collect_set_aggregation()) return agg @classmethod def from_udf(cls, op, *args, **kwargs): - cdef Aggregation agg = Aggregation.__new__(Aggregation) + cdef Aggregation agg = cls() cdef libcudf_types.type_id tid cdef libcudf_types.data_type out_dtype @@ -282,3 +252,42 @@ cdef class _AggregationFactory: libcudf_aggregation.udf_type.PTX, cpp_str, out_dtype )) return agg + + +cdef Aggregation make_aggregation(op, kwargs=None): + r""" + Parameters + ---------- + op : str or callable + If callable, must meet one of the following requirements: + + * Is of the form lambda x: x.agg(*args, **kwargs), where + `agg` is the name of a supported aggregation. Used to + to specify aggregations that take arguments, e.g., + `lambda x: x.quantile(0.5)`. + * Is a user defined aggregation function that operates on + group values. In this case, the output dtype must be + specified in the `kwargs` dictionary. + \*\*kwargs : dict, optional + Any keyword arguments to be passed to the op. + + Returns + ------- + Aggregation + """ + if kwargs is None: + kwargs = {} + + cdef Aggregation agg + if isinstance(op, str): + agg = getattr(Aggregation, op)(**kwargs) + elif callable(op): + if op is list: + agg = Aggregation.collect() + elif "dtype" in kwargs: + agg = Aggregation.from_udf(op, **kwargs) + else: + agg = op(Aggregation) + else: + raise TypeError(f"Unknown aggregation {op}") + return agg diff --git a/python/cudf/cudf/_lib/groupby.pyx b/python/cudf/cudf/_lib/groupby.pyx index 4584841dd33..3c2b541f728 100644 --- a/python/cudf/cudf/_lib/groupby.pyx +++ b/python/cudf/cudf/_lib/groupby.pyx @@ -1,6 +1,15 @@ # Copyright (c) 2020, NVIDIA CORPORATION. from collections import defaultdict +from pandas.core.groupby.groupby import DataError +from cudf.utils.dtypes import ( + is_categorical_dtype, + is_string_dtype, + is_list_dtype, + is_interval_dtype, + is_struct_dtype, + is_decimal_dtype, +) import numpy as np import rmm @@ -13,56 +22,23 @@ from libcpp cimport bool from cudf._lib.column cimport Column from cudf._lib.table cimport Table -from cudf._lib.aggregation cimport make_aggregation, Aggregation +from cudf._lib.aggregation cimport Aggregation, make_aggregation from cudf._lib.cpp.table.table cimport table, table_view cimport cudf._lib.cpp.types as libcudf_types cimport cudf._lib.cpp.groupby as libcudf_groupby -cimport cudf._lib.cpp.aggregation as libcudf_aggregation # The sets below define the possible aggregations that can be performed on -# different dtypes. The uppercased versions of these strings correspond to -# elements of the AggregationKind enum. -_CATEGORICAL_AGGS = { - "count", - "size", - "nunique", - "unique", -} - -_STRING_AGGS = { - "count", - "size", - "max", - "min", - "nunique", - "nth", - "collect", - "unique", -} - -_LIST_AGGS = { - "collect", -} - -_STRUCT_AGGS = { -} - -_INTERVAL_AGGS = { -} - -_DECIMAL_AGGS = { - "count", - "sum", - "argmin", - "argmax", - "min", - "max", - "nunique", - "nth", - "collect" -} +# different dtypes. These strings must be elements of the AggregationKind enum. +_CATEGORICAL_AGGS = {"COUNT", "SIZE", "NUNIQUE", "UNIQUE"} +_STRING_AGGS = {"COUNT", "SIZE", "MAX", "MIN", "NUNIQUE", "NTH", "COLLECT", + "UNIQUE"} +_LIST_AGGS = {"COLLECT"} +_STRUCT_AGGS = set() +_INTERVAL_AGGS = set() +_DECIMAL_AGGS = {"COUNT", "SUM", "ARGMIN", "ARGMAX", "MIN", "MAX", "NUNIQUE", + "NTH", "COLLECT"} cdef class GroupBy: @@ -132,21 +108,51 @@ cdef class GroupBy: """ from cudf.core.column_accessor import ColumnAccessor cdef vector[libcudf_groupby.aggregation_request] c_agg_requests + cdef libcudf_groupby.aggregation_request c_agg_request cdef Column col + cdef Aggregation agg_obj - aggregations = _drop_unsupported_aggs(values, aggregations) + allow_empty = all(len(v) == 0 for v in aggregations.values()) + included_aggregations = defaultdict(list) for i, (col_name, aggs) in enumerate(aggregations.items()): col = values._data[col_name] - c_agg_requests.push_back( - move(libcudf_groupby.aggregation_request()) + dtype = col.dtype + + valid_aggregations = ( + _LIST_AGGS if is_list_dtype(dtype) + else _STRING_AGGS if is_string_dtype(dtype) + else _CATEGORICAL_AGGS if is_categorical_dtype(dtype) + else _STRING_AGGS if is_struct_dtype(dtype) + else _INTERVAL_AGGS if is_interval_dtype(dtype) + else _DECIMAL_AGGS if is_decimal_dtype(dtype) + else "ALL" ) - c_agg_requests[i].values = col.view() + if (valid_aggregations is _DECIMAL_AGGS + and rmm._cuda.gpu.runtimeGetVersion() < 11000): + raise RuntimeError( + "Decimal aggregations are only supported on CUDA >= 11 " + "due to an nvcc compiler bug." + ) + + c_agg_request = move(libcudf_groupby.aggregation_request()) for agg in aggs: - c_agg_requests[i].aggregations.push_back( - move(make_aggregation(agg)) + agg_obj = make_aggregation(agg) + if (valid_aggregations == "ALL" + or agg_obj.kind in valid_aggregations): + included_aggregations[col_name].append(agg) + c_agg_request.aggregations.push_back( + move(agg_obj.c_obj) + ) + if not c_agg_request.aggregations.empty(): + c_agg_request.values = col.view() + c_agg_requests.push_back( + move(c_agg_request) ) + if c_agg_requests.empty() and not allow_empty: + raise DataError("All requested aggregations are unsupported.") + cdef pair[ unique_ptr[table], vector[libcudf_groupby.aggregation_result] @@ -176,81 +182,14 @@ cdef class GroupBy: ) result_data = ColumnAccessor(multiindex=True) - for i, col_name in enumerate(aggregations): - for j, agg_name in enumerate(aggregations[col_name]): + # Note: This loop relies on the included_aggregations dict being + # insertion ordered to map results to requested aggregations by index. + for i, col_name in enumerate(included_aggregations): + for j, agg_name in enumerate(included_aggregations[col_name]): if callable(agg_name): agg_name = agg_name.__name__ result_data[(col_name, agg_name)] = ( Column.from_unique_ptr(move(c_result.second[i].results[j])) ) - result = Table(data=result_data, index=grouped_keys) - return result - - -def _drop_unsupported_aggs(Table values, aggs): - """ - Drop any aggregations that are not supported. - """ - from pandas.core.groupby.groupby import DataError - - if all(len(v) == 0 for v in aggs.values()): - return aggs - - from cudf.utils.dtypes import ( - is_categorical_dtype, - is_string_dtype, - is_list_dtype, - is_interval_dtype, - is_struct_dtype, - is_decimal_dtype, - ) - result = aggs.copy() - - for col_name in aggs: - if ( - is_list_dtype(values._data[col_name].dtype) - ): - for i, agg_name in enumerate(aggs[col_name]): - if Aggregation(agg_name).kind not in _LIST_AGGS: - del result[col_name][i] - elif ( - is_string_dtype(values._data[col_name].dtype) - ): - for i, agg_name in enumerate(aggs[col_name]): - if Aggregation(agg_name).kind not in _STRING_AGGS: - del result[col_name][i] - elif ( - is_categorical_dtype(values._data[col_name].dtype) - ): - for i, agg_name in enumerate(aggs[col_name]): - if Aggregation(agg_name).kind not in _CATEGORICAL_AGGS: - del result[col_name][i] - elif ( - is_struct_dtype(values._data[col_name].dtype) - ): - for i, agg_name in enumerate(aggs[col_name]): - if Aggregation(agg_name).kind not in _STRUCT_AGGS: - del result[col_name][i] - elif ( - is_interval_dtype(values._data[col_name].dtype) - ): - for i, agg_name in enumerate(aggs[col_name]): - if Aggregation(agg_name).kind not in _INTERVAL_AGGS: - del result[col_name][i] - elif ( - is_decimal_dtype(values._data[col_name].dtype) - ): - if rmm._cuda.gpu.runtimeGetVersion() < 11000: - raise RuntimeError( - "Decimal aggregations are only supported on CUDA >= 11 " - "due to an nvcc compiler bug." - ) - for i, agg_name in enumerate(aggs[col_name]): - if Aggregation(agg_name).kind not in _DECIMAL_AGGS: - del result[col_name][i] - - if all(len(v) == 0 for v in result.values()): - raise DataError("No numeric types to aggregate") - - return result + return Table(data=result_data, index=grouped_keys) diff --git a/python/cudf/cudf/_lib/reduce.pyx b/python/cudf/cudf/_lib/reduce.pyx index 62013ea88ae..e5723331f3c 100644 --- a/python/cudf/cudf/_lib/reduce.pyx +++ b/python/cudf/cudf/_lib/reduce.pyx @@ -12,7 +12,7 @@ from cudf._lib.scalar cimport DeviceScalar from cudf._lib.column cimport Column from cudf._lib.types import np_to_cudf_types from cudf._lib.types cimport underlying_type_t_type_id, dtype_to_data_type -from cudf._lib.aggregation cimport make_aggregation, aggregation +from cudf._lib.aggregation cimport make_aggregation, Aggregation from libcpp.memory cimport unique_ptr from libcpp.utility cimport move, pair import numpy as np @@ -45,9 +45,7 @@ def reduce(reduction_op, Column incol, dtype=None, **kwargs): cdef column_view c_incol_view = incol.view() cdef unique_ptr[scalar] c_result - cdef unique_ptr[aggregation] c_agg = move(make_aggregation( - reduction_op, kwargs - )) + cdef Aggregation cython_agg = make_aggregation(reduction_op, kwargs) cdef data_type c_out_dtype = dtype_to_data_type(col_dtype) @@ -65,7 +63,7 @@ def reduce(reduction_op, Column incol, dtype=None, **kwargs): with nogil: c_result = move(cpp_reduce( c_incol_view, - c_agg, + cython_agg.c_obj, c_out_dtype )) @@ -95,9 +93,7 @@ def scan(scan_op, Column incol, inclusive, **kwargs): """ cdef column_view c_incol_view = incol.view() cdef unique_ptr[column] c_result - cdef unique_ptr[aggregation] c_agg = move( - make_aggregation(scan_op, kwargs) - ) + cdef Aggregation cython_agg = make_aggregation(scan_op, kwargs) cdef scan_type c_inclusive if inclusive is True: @@ -108,7 +104,7 @@ def scan(scan_op, Column incol, inclusive, **kwargs): with nogil: c_result = move(cpp_scan( c_incol_view, - c_agg, + cython_agg.c_obj, c_inclusive )) diff --git a/python/cudf/cudf/_lib/rolling.pyx b/python/cudf/cudf/_lib/rolling.pyx index 9c818f39c38..d67fb431ec4 100644 --- a/python/cudf/cudf/_lib/rolling.pyx +++ b/python/cudf/cudf/_lib/rolling.pyx @@ -8,12 +8,11 @@ from libcpp.memory cimport unique_ptr from libcpp.utility cimport move from cudf._lib.column cimport Column -from cudf._lib.aggregation cimport make_aggregation +from cudf._lib.aggregation cimport Aggregation, make_aggregation from cudf._lib.cpp.types cimport size_type from cudf._lib.cpp.column.column cimport column from cudf._lib.cpp.column.column_view cimport column_view -from cudf._lib.cpp.aggregation cimport aggregation from cudf._lib.cpp.rolling cimport ( rolling_window as cpp_rolling_window ) @@ -47,14 +46,12 @@ def rolling(Column source_column, Column pre_column_window, cdef column_view source_column_view = source_column.view() cdef column_view pre_column_window_view cdef column_view fwd_column_window_view - cdef unique_ptr[aggregation] agg + cdef Aggregation cython_agg if callable(op): - agg = move( - make_aggregation(op, {'dtype': source_column.dtype}) - ) + cython_agg = make_aggregation(op, {'dtype': source_column.dtype}) else: - agg = move(make_aggregation(op)) + cython_agg = make_aggregation(op) if window is None: if center: @@ -71,7 +68,7 @@ def rolling(Column source_column, Column pre_column_window, pre_column_window_view, fwd_column_window_view, c_min_periods, - agg) + cython_agg.c_obj) ) else: c_min_periods = min_periods @@ -89,7 +86,7 @@ def rolling(Column source_column, Column pre_column_window, c_window, c_forward_window, c_min_periods, - agg) + cython_agg.c_obj) ) return Column.from_unique_ptr(move(c_result)) diff --git a/python/cudf/cudf/core/column/__init__.py b/python/cudf/cudf/core/column/__init__.py index e0aa9471a2f..32cb557548f 100644 --- a/python/cudf/cudf/core/column/__init__.py +++ b/python/cudf/cudf/core/column/__init__.py @@ -7,7 +7,6 @@ as_column, build_categorical_column, build_column, - column_applymap, column_empty, column_empty_like, column_empty_like_same_mask, diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 547e298cc83..ee196e6659f 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -25,7 +25,7 @@ import numpy as np import pandas as pd import pyarrow as pa -from numba import cuda, njit +from numba import cuda import cudf from cudf import _lib as libcudf @@ -41,8 +41,7 @@ from cudf._typing import BinaryOperand, ColumnLike, Dtype, ScalarLike from cudf.core.abc import Serializable from cudf.core.buffer import Buffer -from cudf.core.dtypes import CategoricalDtype -from cudf.core.dtypes import IntervalDtype +from cudf.core.dtypes import CategoricalDtype, IntervalDtype from cudf.utils import ioutils, utils from cudf.utils.dtypes import ( NUMERIC_TYPES, @@ -457,6 +456,16 @@ def _memory_usage(self, **kwargs) -> int: def default_na_value(self) -> Any: raise NotImplementedError() + def applymap( + self, udf: Callable[[ScalarLike], ScalarLike], out_dtype: Dtype = None + ) -> ColumnBase: + """Apply an element-wise function to the values in the Column.""" + # Subclasses that support applymap must override this behavior. + raise TypeError( + "User-defined functions are currently not supported on data " + f"with dtype {self.dtype}." + ) + def to_gpu_array(self, fillna=None) -> "cuda.devicearray.DeviceNDArray": """Get a dense numba device array for the data. @@ -1886,7 +1895,9 @@ def as_column( col = col.set_mask(mask) elif np.issubdtype(col.dtype, np.datetime64): if nan_as_null or (mask is None and nan_as_null is None): - col = utils.time_col_replace_nulls(col) + # Ignore typing error since this method is only defined for + # DatetimeColumn, not the ColumnBase class. + col = col._make_copy_with_na_as_null() # type: ignore return col elif isinstance(arbitrary, (pa.Array, pa.ChunkedArray)): @@ -2007,7 +2018,7 @@ def as_column( data = as_column( buffer, dtype=arbitrary.dtype, nan_as_null=nan_as_null ) - data = utils.time_col_replace_nulls(data) + data = data._make_copy_with_na_as_null() mask = data.mask data = cudf.core.column.datetime.DatetimeColumn( @@ -2027,7 +2038,7 @@ def as_column( data = as_column( buffer, dtype=arbitrary.dtype, nan_as_null=nan_as_null ) - data = utils.time_col_replace_nulls(data) + data = data._make_copy_with_na_as_null() mask = data.mask data = cudf.core.column.timedelta.TimeDeltaColumn( @@ -2208,58 +2219,6 @@ def _construct_array( return arbitrary -def column_applymap( - udf: Callable[[ScalarLike], ScalarLike], - column: ColumnBase, - out_dtype: Dtype, -) -> ColumnBase: - """Apply an element-wise function to transform the values in the Column. - - Parameters - ---------- - udf : function - Wrapped by numba jit for call on the GPU as a device function. - column : Column - The source column. - out_dtype : numpy.dtype - The dtype for use in the output. - - Returns - ------- - result : Column - """ - core = njit(udf) - results = column_empty(len(column), dtype=out_dtype) - values = column.data_array_view - if column.nullable: - # For masked columns - @cuda.jit - def kernel_masked(values, masks, results): - i = cuda.grid(1) - # in range? - if i < values.size: - # valid? - if utils.mask_get(masks, i): - # call udf - results[i] = core(values[i]) - - masks = column.mask_array_view - kernel_masked.forall(len(column))(values, masks, results) - else: - # For non-masked columns - @cuda.jit - def kernel_non_masked(values, results): - i = cuda.grid(1) - # in range? - if i < values.size: - # call udf - results[i] = core(values[i]) - - kernel_non_masked.forall(len(column))(values, results) - - return as_column(results) - - def _data_from_cuda_array_interface_desc(obj) -> Buffer: desc = obj.__cuda_array_interface__ ptr = desc["data"][0] diff --git a/python/cudf/cudf/core/column/datetime.py b/python/cudf/cudf/core/column/datetime.py index 0bacbe04356..ad91ff3f185 100644 --- a/python/cudf/cudf/core/column/datetime.py +++ b/python/cudf/cudf/core/column/datetime.py @@ -16,7 +16,13 @@ from cudf._typing import DatetimeLikeScalar, Dtype, DtypeObj, ScalarLike from cudf.core._compat import PANDAS_GE_120 from cudf.core.buffer import Buffer -from cudf.core.column import ColumnBase, column, string +from cudf.core.column import ( + ColumnBase, + as_column, + column, + column_empty_like, + string, +) from cudf.utils.dtypes import is_scalar from cudf.utils.utils import _fillna_natwise @@ -306,7 +312,7 @@ def fillna( self, fill_value: Any = None, method: str = None, dtype: Dtype = None ) -> DatetimeColumn: if fill_value is not None: - if cudf.utils.utils.isnat(fill_value): + if cudf.utils.utils._isnat(fill_value): return _fillna_natwise(self) if is_scalar(fill_value): if not isinstance(fill_value, cudf.Scalar): @@ -372,6 +378,23 @@ def can_cast_safely(self, to_dtype: Dtype) -> bool: else: return False + def _make_copy_with_na_as_null(self): + """Return a copy with NaN values replaced with nulls.""" + null = column_empty_like(self, masked=True, newsize=1) + out_col = cudf._lib.replace.replace( + self, + as_column( + Buffer( + np.array([self.default_na_value()], dtype=self.dtype).view( + "|u1" + ) + ), + dtype=self.dtype, + ), + null, + ) + return out_col + @annotate("BINARY_OP", color="orange", domain="cudf_python") def binop( diff --git a/python/cudf/cudf/core/column/numerical.py b/python/cudf/cudf/core/column/numerical.py index 10a9ffbfbae..70b4569b180 100644 --- a/python/cudf/cudf/core/column/numerical.py +++ b/python/cudf/cudf/core/column/numerical.py @@ -7,6 +7,7 @@ import numpy as np import pandas as pd +from numba import cuda, njit from nvtx import annotate from pandas.api.types import is_integer_dtype @@ -20,6 +21,7 @@ as_column, build_column, column, + column_empty, string, ) from cudf.core.dtypes import Decimal64Dtype @@ -422,8 +424,22 @@ def applymap( """ if out_dtype is None: out_dtype = self.dtype - out = column.column_applymap(udf=udf, column=self, out_dtype=out_dtype) - return out + + core = njit(udf) + + # For non-masked columns + @cuda.jit + def kernel_applymap(values, results): + i = cuda.grid(1) + # in range? + if i < values.size: + # call udf + results[i] = core(values[i]) + + results = column_empty(self.size, dtype=out_dtype) + values = self.data_array_view + kernel_applymap.forall(self.size)(values, results) + return as_column(results) def default_na_value(self) -> ScalarLike: """Returns the default NA value for this column diff --git a/python/cudf/cudf/core/column/timedelta.py b/python/cudf/cudf/core/column/timedelta.py index a39638106bb..d8ad11f41b3 100644 --- a/python/cudf/cudf/core/column/timedelta.py +++ b/python/cudf/cudf/core/column/timedelta.py @@ -306,7 +306,7 @@ def fillna( self, fill_value: Any = None, method: str = None, dtype: Dtype = None ) -> TimeDeltaColumn: if fill_value is not None: - if cudf.utils.utils.isnat(fill_value): + if cudf.utils.utils._isnat(fill_value): return _fillna_natwise(self) col = self # type: column.ColumnBase if is_scalar(fill_value): diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index cc94548d9a2..a52fae994e7 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -1,6 +1,5 @@ # Copyright (c) 2020, NVIDIA CORPORATION. import collections -import functools import pickle import warnings @@ -570,47 +569,106 @@ def rolling(self, *args, **kwargs): """ return cudf.core.window.rolling.RollingGroupby(self, *args, **kwargs) + def count(self, dropna=True): + """Compute the number of values in each column. -# Set of valid groupby aggregations that are monkey-patched into the GroupBy -# namespace. -_VALID_GROUPBY_AGGS = { - "count", - "sum", - "idxmin", - "idxmax", - "min", - "max", - "mean", - "var", - "std", - "quantile", - "median", - "nunique", - "collect", - "unique", -} - - -# Dynamically bind the different aggregation methods. -def _agg_func_name_with_args(self, func_name, *args, **kwargs): - """ - Aggregate given an aggregate function name and arguments to the - function, e.g., `_agg_func_name_with_args("quantile", 0.5)`. The named - aggregations must be members of _AggregationFactory. - """ + Parameters + ---------- + dropna : bool + If ``True``, don't include null values in the count. + """ + + def func(x): + return getattr(x, "count")(dropna=dropna) + + return self.agg(func) + + def sum(self): + """Compute the column-wise sum of the values in each group.""" + return self.agg("sum") + + def idxmin(self): + """Get the column-wise index of the minimum value in each group.""" + return self.agg("idxmin") - def func(x): - """Compute the {} of the group.""".format(func_name) - return getattr(x, func_name)(*args, **kwargs) + def idxmax(self): + """Get the column-wise index of the maximum value in each group.""" + return self.agg("idxmax") + + def min(self): + """Get the column-wise minimum value in each group.""" + return self.agg("min") + + def max(self): + """Get the column-wise maximum value in each group.""" + return self.agg("max") + + def mean(self): + """Compute the column-wise mean of the values in each group.""" + return self.agg("mean") + + def median(self): + """Get the column-wise median of the values in each group.""" + return self.agg("median") + + def var(self, ddof=1): + """Compute the column-wise variance of the values in each group. + + Parameters + ---------- + ddof : int + The delta degrees of freedom. N - ddof is the divisor used to + normalize the variance. + """ - func.__name__ = func_name - return self.agg(func) + def func(x): + return getattr(x, "var")(ddof=ddof) + + return self.agg(func) + + def std(self, ddof=1): + """Compute the column-wise std of the values in each group. + + Parameters + ---------- + ddof : int + The delta degrees of freedom. N - ddof is the divisor used to + normalize the standard deviation. + """ + + def func(x): + return getattr(x, "std")(ddof=ddof) + + return self.agg(func) + + def quantile(self, q=0.5, interpolation="linear"): + """Compute the column-wise quantiles of the values in each group. + + Parameters + ---------- + q : float or array-like + The quantiles to compute. + interpolation : {"linear", "lower", "higher", "midpoint", "nearest"} + The interpolation method to use when the desired quantile lies + between two data points. Defaults to "linear". + """ + + def func(x): + return getattr(x, "quantile")(q=q, interpolation=interpolation) + + return self.agg(func) + + def nunique(self): + """Compute the number of unique values in each column in each group.""" + return self.agg("nunique") + def collect(self): + """Get a list of all the values for each column in each group.""" + return self.agg("collect") -for key in _VALID_GROUPBY_AGGS: - setattr( - GroupBy, key, functools.partialmethod(_agg_func_name_with_args, key) - ) + def unique(self): + """Get a list of the unique values for each column in each group.""" + return self.agg("unique") class DataFrameGroupBy(GroupBy): diff --git a/python/cudf/cudf/core/index.py b/python/cudf/cudf/core/index.py index f65afb6a1d4..0ffe0c11fef 100644 --- a/python/cudf/cudf/core/index.py +++ b/python/cudf/cudf/core/index.py @@ -30,7 +30,7 @@ from cudf.core.column.string import StringMethods as StringMethods from cudf.core.dtypes import IntervalDtype from cudf.core.frame import Frame -from cudf.utils import ioutils, utils +from cudf.utils import ioutils from cudf.utils.docutils import copy_docstring from cudf.utils.dtypes import ( find_common_type, @@ -1734,8 +1734,9 @@ def __len__(self): return len(range(self._start, self._stop, self._step)) def __getitem__(self, index): + len_self = len(self) if isinstance(index, slice): - sl_start, sl_stop, sl_step = index.indices(len(self)) + sl_start, sl_stop, sl_step = index.indices(len_self) lo = self._start + sl_start * self._step hi = self._start + sl_stop * self._step @@ -1743,7 +1744,11 @@ def __getitem__(self, index): return RangeIndex(start=lo, stop=hi, step=st, name=self._name) elif isinstance(index, Number): - index = utils.normalize_index(index, len(self)) + if index < 0: + index = len_self + index + if not (0 <= index < len_self): + raise IndexError("out-of-bound") + index = min(index, len_self) index = self._start + index * self._step return index else: diff --git a/python/cudf/cudf/core/series.py b/python/cudf/cudf/core/series.py index c8b3f9f0a36..4cc5fb56a4c 100644 --- a/python/cudf/cudf/core/series.py +++ b/python/cudf/cudf/core/series.py @@ -43,7 +43,7 @@ from cudf.core.index import Index, RangeIndex, as_index from cudf.core.indexing import _SeriesIlocIndexer, _SeriesLocIndexer from cudf.core.window import Rolling -from cudf.utils import cudautils, docutils, ioutils, utils +from cudf.utils import cudautils, docutils, ioutils from cudf.utils.docutils import copy_docstring from cudf.utils.dtypes import ( can_convert_to_column, @@ -52,7 +52,6 @@ is_list_like, is_mixed_with_object_dtype, is_scalar, - is_string_dtype, min_scalar_type, numeric_normalize_types, ) @@ -1505,7 +1504,6 @@ def _binaryop( if isinstance(other, cudf.DataFrame): return NotImplemented - result_name = utils.get_result_name(self, other) if isinstance(other, Series): if not can_reindex and fn in cudf.utils.utils._EQUALITY_OPS: if not self.index.equals(other.index): @@ -1544,8 +1542,19 @@ def _binaryop( rhs = rhs.fillna(fill_value) outcol = lhs._column.binary_operator(fn, rhs, reflect=reflect) - result = lhs._copy_construct(data=outcol, name=result_name) - return result + + # Get the appropriate name for output operations involving two objects + # that are a mix of pandas and cudf Series and Index. If the two inputs + # are identically named, the output shares this name. + if isinstance(other, (cudf.Series, cudf.Index, pd.Series, pd.Index)): + if self.name == other.name: + result_name = self.name + else: + result_name = None + else: + result_name = self.name + + return lhs._copy_construct(data=outcol, name=result_name) def add(self, other, fill_value=None, axis=0): """ @@ -4365,14 +4374,6 @@ def applymap(self, udf, out_dtype=None): 4 105 dtype: int64 """ - if is_string_dtype(self._column.dtype) or isinstance( - self._column, cudf.core.column.CategoricalColumn - ): - raise TypeError( - "User defined functions are currently not " - "supported on Series with dtypes `str` and `category`." - ) - if callable(udf): res_col = self._unaryop(udf) else: diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index 4dbe608af82..868387b100e 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -1236,7 +1236,11 @@ def test_raise_data_error(): pdf = pd.DataFrame({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "d"]}) gdf = cudf.from_pandas(pdf) - assert_exceptions_equal(pdf.groupby("a").mean, gdf.groupby("a").mean) + assert_exceptions_equal( + pdf.groupby("a").mean, + gdf.groupby("a").mean, + compare_error_message=False, + ) def test_drop_unsupported_multi_agg(): diff --git a/python/cudf/cudf/utils/cudautils.py b/python/cudf/cudf/utils/cudautils.py index 722e0b12183..262fe304dd8 100755 --- a/python/cudf/cudf/utils/cudautils.py +++ b/python/cudf/cudf/utils/cudautils.py @@ -2,46 +2,12 @@ from pickle import dumps import cachetools -import cupy import numpy as np from numba import cuda import cudf -from cudf.utils.utils import check_equals_float, check_equals_int -try: - # Numba >= 0.49 - from numba.np import numpy_support -except ImportError: - # Numba <= 0.49 - from numba import numpy_support - - -# GPU array type casting - - -def as_contiguous(arr): - assert arr.ndim == 1 - cupy_dtype = arr.dtype - if np.issubdtype(cupy_dtype, np.datetime64): - cupy_dtype = np.dtype("int64") - arr = arr.view("int64") - out = cupy.ascontiguousarray(cupy.asarray(arr)) - return cuda.as_cuda_array(out).view(arr.dtype) - - -# Mask utils - - -def full(size, value, dtype): - cupy_dtype = dtype - if np.issubdtype(cupy_dtype, np.datetime64): - time_unit, _ = np.datetime_data(cupy_dtype) - cupy_dtype = np.int64 - value = np.datetime64(value, time_unit).view(cupy_dtype) - - out = cupy.full(size, value, cupy_dtype) - return cuda.as_cuda_array(out).view(dtype) +from numba.np import numpy_support # @@ -77,7 +43,7 @@ def gpu_diff(in_col, out_col, out_mask, N): def gpu_mark_found_int(arr, val, out, not_found): i = cuda.grid(1) if i < arr.size: - if check_equals_int(arr[i], val): + if arr[i] == val: out[i] = i else: out[i] = not_found @@ -92,7 +58,10 @@ def gpu_mark_found_float(arr, val, out, not_found): # at 0.51.1, this will have a very slight # performance improvement. Related # discussion in : https://github.com/rapidsai/cudf/pull/6073 - if check_equals_float(arr[i], float(val)): + val = float(val) + + # NaN-aware equality comparison. + if (arr[i] == val) or (arr[i] != arr[i] and val != val): out[i] = i else: out[i] = not_found diff --git a/python/cudf/cudf/utils/utils.py b/python/cudf/cudf/utils/utils.py index c69ccb0f42e..d9bae91e123 100644 --- a/python/cudf/cudf/utils/utils.py +++ b/python/cudf/cudf/utils/utils.py @@ -2,12 +2,10 @@ import functools from collections.abc import Sequence -from math import floor, isinf, isnan import cupy as cp import numpy as np import pandas as pd -from numba import njit import rmm @@ -16,9 +14,11 @@ from cudf.core.buffer import Buffer from cudf.utils.dtypes import to_cudf_compatible_scalar +# The size of the mask in bytes mask_dtype = np.dtype(np.int32) mask_bitsize = mask_dtype.itemsize * 8 + _EQUALITY_OPS = { "eq", "ne", @@ -35,46 +35,6 @@ } -@njit -def mask_get(mask, pos): - return (mask[pos // mask_bitsize] >> (pos % mask_bitsize)) & 1 - - -@njit -def check_equals_float(a, b): - return ( - a == b - or (isnan(a) and isnan(b)) - or ((isinf(a) and a < 0) and (isinf(b) and b < 0)) - or ((isinf(a) and a > 0) and (isinf(b) and b > 0)) - ) - - -@njit -def rint(x): - """Round to the nearest integer. - - Returns - ------- - The nearest integer, as a float. - """ - y = floor(x) - r = x - y - - if r > 0.5: - y += 1.0 - if r == 0.5: - r = y - 2.0 * floor(0.5 * y) - if r == 1.0: - y += 1.0 - return y - - -@njit -def check_equals_int(a, b): - return a == b - - def scalar_broadcast_to(scalar, size, dtype=None): if isinstance(size, (tuple, list)): @@ -110,72 +70,6 @@ def scalar_broadcast_to(scalar, size, dtype=None): return out_col -def normalize_index(index, size, doraise=True): - """Normalize negative index - """ - if index < 0: - index = size + index - if doraise and not (0 <= index < size): - raise IndexError("out-of-bound") - return min(index, size) - - -list_types_tuple = (list, np.array) - - -def get_result_name(left, right): - """ - This function will give appropriate name for the operations - involving two Series, Index's or combination of both. - - Parameters - ---------- - left : {Series, Index} - right : object - - Returns - ------- - name : object {string or None} - """ - - if isinstance(right, (cudf.Series, cudf.Index, pd.Series, pd.Index)): - name = compare_and_get_name(left, right) - else: - name = left.name - return name - - -def compare_and_get_name(a, b): - """ - If both a & b have name attribute, and they are - same return the common name. - Else, return either one of the name of a or b, - whichever is present. - - Parameters - ---------- - a : object - b : object - - Returns - ------- - name : str or None - """ - a_has = hasattr(a, "name") - b_has = hasattr(b, "name") - - if a_has and b_has: - if a.name == b.name: - return a.name - else: - return None - elif a_has: - return a.name - elif b_has: - return b.name - return None - - def initfunc(f): """ Decorator for initialization functions that should @@ -193,24 +87,6 @@ def wrapper(*args, **kwargs): return wrapper -def get_null_series(size, dtype=np.bool_): - """ - Creates a null series of provided dtype and size - - Parameters - ---------- - size: length of series - dtype: dtype of series to create; defaults to bool. - - Returns - ------- - a null cudf series of provided `size` and `dtype` - """ - - empty_col = column.column_empty(size, dtype, True) - return cudf.Series(empty_col) - - # taken from dask array # https://github.com/dask/dask/blob/master/dask/array/utils.py#L352-L363 def _is_nep18_active(): @@ -267,6 +143,9 @@ class cached_property: it with `del`. """ + # TODO: Can be replaced with functools.cached_property when we drop support + # for Python 3.7. + def __init__(self, func): self.func = func @@ -279,24 +158,6 @@ def __get__(self, instance, cls): return value -def time_col_replace_nulls(input_col): - - null = column.column_empty_like(input_col, masked=True, newsize=1) - out_col = cudf._lib.replace.replace( - input_col, - column.as_column( - Buffer( - np.array( - [input_col.default_na_value()], dtype=input_col.dtype - ).view("|u1") - ), - dtype=input_col.dtype, - ), - null, - ) - return out_col - - def raise_iteration_error(obj): raise TypeError( f"{obj.__class__.__name__} object is not iterable. " @@ -317,7 +178,8 @@ def pa_mask_buffer_to_mask(mask_buf, size): return Buffer(mask_buf) -def isnat(val): +def _isnat(val): + """Wraps np.isnat to return False instead of error on invalid inputs.""" if not isinstance(val, (np.datetime64, np.timedelta64, str)): return False else: diff --git a/python/cudf/requirements/cuda-10.1/dev_requirements.txt b/python/cudf/requirements/cuda-10.1/dev_requirements.txt deleted file mode 100644 index 967974d38b5..00000000000 --- a/python/cudf/requirements/cuda-10.1/dev_requirements.txt +++ /dev/null @@ -1,41 +0,0 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. - -# pyarrow gpu package will have to be built from source : -# https://arrow.apache.org/docs/python/install.html#installing-from-source - -cupy-cuda101 -cachetools -cmake -cmake-setuptools>=0.1.3 -cython>=0.29,<0.30 -dlpack -fastavro>=0.22.9 -flatbuffers -fsspec>=0.6.0 -hypothesis -mimesis -mypy==0.782 -nbsphinx -numba>=0.49.0,!=0.51.0 -numpy -numpydoc -nvtx>=0.2.1 -packaging -pandas>=1.0,<1.3.0dev0 -pandoc==2.0a4 -protobuf -pyorc -pytest -pytest-benchmark -pytest-xdist -rapidjson -recommonmark -setuptools -sphinx -sphinx-copybutton -sphinx-markdown-tables -sphinx_rtd_theme -sphinxcontrib-websupport -typing_extensions -typing_extensions -wheel \ No newline at end of file diff --git a/python/cudf/requirements/cuda-10.2/dev_requirements.txt b/python/cudf/requirements/cuda-10.2/dev_requirements.txt deleted file mode 100644 index 34450456b5a..00000000000 --- a/python/cudf/requirements/cuda-10.2/dev_requirements.txt +++ /dev/null @@ -1,41 +0,0 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. - -# pyarrow gpu package will have to be built from source : -# https://arrow.apache.org/docs/python/install.html#installing-from-source - -cupy-cuda102 -cachetools -cmake -cmake-setuptools>=0.1.3 -cython>=0.29,<0.30 -dlpack -fastavro>=0.22.9 -flatbuffers -fsspec>=0.6.0 -hypothesis -mimesis -mypy==0.782 -nbsphinx -numba>=0.49.0,!=0.51.0 -numpy -numpydoc -nvtx>=0.2.1 -packaging -pandas>=1.0,<1.3.0dev0 -pandoc==2.0a4 -protobuf -pyorc -pytest -pytest-benchmark -pytest-xdist -rapidjson -recommonmark -setuptools -sphinx -sphinx-copybutton -sphinx-markdown-tables -sphinx_rtd_theme -sphinxcontrib-websupport -typing_extensions -typing_extensions -wheel \ No newline at end of file