diff --git a/.github/workflows/jni-docker-build.yml b/.github/workflows/jni-docker-build.yml new file mode 100644 index 00000000000..0bdc409d0ab --- /dev/null +++ b/.github/workflows/jni-docker-build.yml @@ -0,0 +1,53 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: JNI Docker Build + +on: + workflow_dispatch: # manual trigger only + +concurrency: + group: jni-docker-build-${{ github.ref }} + cancel-in-progress: true + +jobs: + docker-build: + if: github.repository == 'rapidsai/cudf' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v2 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + - name: Login to DockerHub + uses: docker/login-action@v2 + with: + username: ${{ secrets.GPUCIBOT_DOCKERHUB_USER }} + password: ${{ secrets.GPUCIBOT_DOCKERHUB_TOKEN }} + + - name: Set ENVs + run: | + echo "IMAGE_NAME=rapidsai/cudf-jni-build" >> $GITHUB_ENV + echo "IMAGE_REF=${GITHUB_REF_NAME}" >> $GITHUB_ENV + + - name: Build and Push + uses: docker/build-push-action@v3 + with: + push: true + file: java/ci/Dockerfile.centos7 + tags: "${{ env.IMAGE_NAME }}:${{ env.IMAGE_REF }}" diff --git a/conda/environments/cudf_dev_cuda11.5.yml b/conda/environments/cudf_dev_cuda11.5.yml index 1b79bdb763f..d6d05926099 100644 --- a/conda/environments/cudf_dev_cuda11.5.yml +++ b/conda/environments/cudf_dev_cuda11.5.yml @@ -67,6 +67,11 @@ dependencies: - pydata-sphinx-theme - librdkafka=1.7.0 - python-confluent-kafka=1.7.0 + - moto>=3.1.6 + - boto3>=1.21.21 + - botocore>=1.24.21 + - aiobotocore>=2.2.0 + - s3fs>=2022.3.0 - pip: - git+https://github.com/python-streamz/streamz.git@master - pyorc diff --git a/cpp/include/cudf/detail/search.hpp b/cpp/include/cudf/detail/search.hpp index c986418c790..44067ff87c0 100644 --- a/cpp/include/cudf/detail/search.hpp +++ b/cpp/include/cudf/detail/search.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,11 +33,11 @@ namespace detail { * @param stream CUDA stream used for device memory operations and kernel launches. */ std::unique_ptr lower_bound( - table_view const& t, - table_view const& values, + table_view const& haystack, + table_view const& needles, std::vector const& column_order, std::vector const& null_precedence, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** @@ -46,33 +46,29 @@ std::unique_ptr lower_bound( * @param stream CUDA stream used for device memory operations and kernel launches. */ std::unique_ptr upper_bound( - table_view const& t, - table_view const& values, + table_view const& haystack, + table_view const& needles, std::vector const& column_order, std::vector const& null_precedence, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** - * @copydoc cudf::contains(column_view const&, scalar const&, - * rmm::mr::device_memory_resource*) + * @copydoc cudf::contains(column_view const&, scalar const&, rmm::mr::device_memory_resource*) * * @param stream CUDA stream used for device memory operations and kernel launches. */ -bool contains(column_view const& col, - scalar const& value, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); +bool contains(column_view const& haystack, scalar const& needle, rmm::cuda_stream_view stream); /** - * @copydoc cudf::contains(column_view const&, column_view const&, - * rmm::mr::device_memory_resource*) + * @copydoc cudf::contains(column_view const&, column_view const&, rmm::mr::device_memory_resource*) * * @param stream CUDA stream used for device memory operations and kernel launches. */ std::unique_ptr contains( column_view const& haystack, column_view const& needles, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); } // namespace detail diff --git a/cpp/include/cudf/search.hpp b/cpp/include/cudf/search.hpp index 56a31891e27..3b68923ee93 100644 --- a/cpp/include/cudf/search.hpp +++ b/cpp/include/cudf/search.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,134 +32,123 @@ namespace cudf { */ /** - * @brief Find smallest indices in a sorted table where values should be - * inserted to maintain order + * @brief Find smallest indices in a sorted table where values should be inserted to maintain order. * - * For each row v in @p values, find the first index in @p t where - * inserting the row will maintain the sort order of @p t + * For each row in `needles`, find the first index in `haystack` where inserting the row still + * maintains its sort order. * * @code{.pseudo} * Example: * * Single column: - * idx 0 1 2 3 4 - * column = { 10, 20, 20, 30, 50 } - * values = { 20 } - * result = { 1 } + * idx 0 1 2 3 4 + * haystack = { 10, 20, 20, 30, 50 } + * needles = { 20 } + * result = { 1 } * * Multi Column: - * idx 0 1 2 3 4 - * t = {{ 10, 20, 20, 20, 20 }, - * { 5.0, .5, .5, .7, .7 }, - * { 90, 77, 78, 61, 61 }} - * values = {{ 20 }, - * { .7 }, - * { 61 }} - * result = { 3 } + * idx 0 1 2 3 4 + * haystack = {{ 10, 20, 20, 20, 20 }, + * { 5.0, .5, .5, .7, .7 }, + * { 90, 77, 78, 61, 61 }} + * needles = {{ 20 }, + * { .7 }, + * { 61 }} + * result = { 3 } * @endcode * - * @param t Table to search - * @param values Find insert locations for these values - * @param column_order Vector of column sort order - * @param null_precedence Vector of null_precedence enums values - * @param mr Device memory resource used to allocate the returned column's device - * memory + * @param haystack The table containing search space. + * @param needles Values for which to find the insert locations in the search space. + * @param column_order Vector of column sort order. + * @param null_precedence Vector of null_precedence enums needles. + * @param mr Device memory resource used to allocate the returned column's device memory. * @return A non-nullable column of cudf::size_type elements containing the insertion points. */ std::unique_ptr lower_bound( - table_view const& t, - table_view const& values, + table_view const& haystack, + table_view const& needles, std::vector const& column_order, std::vector const& null_precedence, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** - * @brief Find largest indices in a sorted table where values should be - * inserted to maintain order + * @brief Find largest indices in a sorted table where values should be inserted to maintain order. * - * For each row v in @p values, find the last index in @p t where - * inserting the row will maintain the sort order of @p t + * For each row in `needles`, find the last index in `haystack` where inserting the row still + * maintains its sort order. * * @code{.pseudo} * Example: * * Single Column: - * idx 0 1 2 3 4 - * column = { 10, 20, 20, 30, 50 } - * values = { 20 } - * result = { 3 } + * idx 0 1 2 3 4 + * haystack = { 10, 20, 20, 30, 50 } + * needles = { 20 } + * result = { 3 } * * Multi Column: - * idx 0 1 2 3 4 - * t = {{ 10, 20, 20, 20, 20 }, - * { 5.0, .5, .5, .7, .7 }, - * { 90, 77, 78, 61, 61 }} - * values = {{ 20 }, - * { .7 }, - * { 61 }} - * result = { 5 } + * idx 0 1 2 3 4 + * haystack = {{ 10, 20, 20, 20, 20 }, + * { 5.0, .5, .5, .7, .7 }, + * { 90, 77, 78, 61, 61 }} + * needles = {{ 20 }, + * { .7 }, + * { 61 }} + * result = { 5 } * @endcode * - * @param search_table Table to search - * @param values Find insert locations for these values - * @param column_order Vector of column sort order - * @param null_precedence Vector of null_precedence enums values - * @param mr Device memory resource used to allocate the returned column's device - * memory + * @param haystack The table containing search space. + * @param needles Values for which to find the insert locations in the search space. + * @param column_order Vector of column sort order. + * @param null_precedence Vector of null_precedence enums needles. + * @param mr Device memory resource used to allocate the returned column's device memory. * @return A non-nullable column of cudf::size_type elements containing the insertion points. */ std::unique_ptr upper_bound( - table_view const& search_table, - table_view const& values, + table_view const& haystack, + table_view const& needles, std::vector const& column_order, std::vector const& null_precedence, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** - * @brief Find if the `value` is present in the `col` + * @brief Check if the given `needle` value exists in the `haystack` column. * - * @throws cudf::logic_error - * If `col.type() != values.type()` + * @throws cudf::logic_error If `haystack.type() != needle.type()`. * * @code{.pseudo} * Single Column: - * idx 0 1 2 3 4 - * col = { 10, 20, 20, 30, 50 } - * Scalar: - * value = { 20 } - * result = true + * idx 0 1 2 3 4 + * haystack = { 10, 20, 20, 30, 50 } + * needle = { 20 } + * result = true * @endcode * - * @param col A column object - * @param value A scalar value to search for in `col` - * - * @return bool If `value` is found in `column` true, else false. + * @param haystack The column containing search space. + * @param needle A scalar value to check for existence in the search space. + * @return true if the given `needle` value exists in the `haystack` column. */ -bool contains(column_view const& col, scalar const& value); +bool contains(column_view const& haystack, scalar const& needle); /** - * @brief Returns a new column of type bool identifying for each element of @p haystack column, - * if that element is contained in @p needles column. + * @brief Check if the given `needles` values exists in the `haystack` column. * - * The new column will have the same dimension and null status as the @p haystack column. That is, - * any element that is invalid in the @p haystack column will be invalid in the returned column. + * The new column will have type BOOL and have the same size and null mask as the input `needles` + * column. That is, any null row in the `needles` column will result in a nul row in the output + * column. * - * @throws cudf::logic_error - * If `haystack.type() != needles.type()` + * @throws cudf::logic_error If `haystack.type() != needles.type()` * * @code{.pseudo} * haystack = { 10, 20, 30, 40, 50 } * needles = { 20, 40, 60, 80 } - * - * result = { false, true, false, true, false } + * result = { true, true, false, false } * @endcode * - * @param haystack A column object - * @param needles A column of values to search for in `col` - * @param mr Device memory resource used to allocate the returned column's device memory - * - * @return A column of bool elements containing true if the corresponding entry in haystack - * appears in needles and false if it does not. + * @param haystack The column containing search space. + * @param needles A column of values to check for existence in the search space. + * @param mr Device memory resource used to allocate the returned column's device memory. + * @return A BOOL column indicating if each element in `needles` exists in the search space. */ std::unique_ptr contains( column_view const& haystack, diff --git a/cpp/src/dictionary/remove_keys.cu b/cpp/src/dictionary/remove_keys.cu index c4b3bbc00e4..a98e69149af 100644 --- a/cpp/src/dictionary/remove_keys.cu +++ b/cpp/src/dictionary/remove_keys.cu @@ -158,7 +158,7 @@ std::unique_ptr remove_keys( CUDF_EXPECTS(keys_view.type() == keys_to_remove.type(), "keys types must match"); // locate keys to remove by searching the keys column - auto const matches = cudf::detail::contains(keys_view, keys_to_remove, stream, mr); + auto const matches = cudf::detail::contains(keys_to_remove, keys_view, stream, mr); auto d_matches = matches->view().data(); // call common utility method to keep the keys not matched to keys_to_remove auto key_matcher = [d_matches] __device__(size_type idx) { return !d_matches[idx]; }; @@ -181,7 +181,7 @@ std::unique_ptr remove_unused_keys( thrust::sequence(rmm::exec_policy(stream), keys_positions.begin(), keys_positions.end()); // wrap the indices for comparison in contains() column_view keys_positions_view(data_type{type_id::UINT32}, keys_size, keys_positions.data()); - return cudf::detail::contains(keys_positions_view, indices_view, stream, mr); + return cudf::detail::contains(indices_view, keys_positions_view, stream, mr); }(); auto d_matches = matches->view().data(); diff --git a/cpp/src/dictionary/set_keys.cu b/cpp/src/dictionary/set_keys.cu index dfc6cbb78cc..25c46837e9f 100644 --- a/cpp/src/dictionary/set_keys.cu +++ b/cpp/src/dictionary/set_keys.cu @@ -138,7 +138,7 @@ std::unique_ptr set_keys( std::unique_ptr keys_column(std::move(sorted_keys.front())); // compute the new nulls - auto matches = cudf::detail::contains(keys, keys_column->view(), stream, mr); + auto matches = cudf::detail::contains(keys_column->view(), keys, stream, mr); auto d_matches = matches->view().data(); auto indices_itr = cudf::detail::indexalator_factory::make_input_iterator(dictionary_column.indices()); diff --git a/cpp/src/search/search.cu b/cpp/src/search/search.cu index 29eddf703df..491ad49e020 100644 --- a/cpp/src/search/search.cu +++ b/cpp/src/search/search.cu @@ -43,40 +43,9 @@ namespace cudf { namespace { -template -void launch_search(DataIterator it_data, - ValuesIterator it_vals, - size_type data_size, - size_type values_size, - OutputIterator it_output, - Comparator comp, - bool find_first, - rmm::cuda_stream_view stream) -{ - if (find_first) { - thrust::lower_bound(rmm::exec_policy(stream), - it_data, - it_data + data_size, - it_vals, - it_vals + values_size, - it_output, - comp); - } else { - thrust::upper_bound(rmm::exec_policy(stream), - it_data, - it_data + data_size, - it_vals, - it_vals + values_size, - it_output, - comp); - } -} -std::unique_ptr search_ordered(table_view const& t, - table_view const& values, +std::unique_ptr search_ordered(table_view const& haystack, + table_view const& needles, bool find_first, std::vector const& column_order, std::vector const& null_precedence, @@ -84,30 +53,30 @@ std::unique_ptr search_ordered(table_view const& t, rmm::mr::device_memory_resource* mr) { CUDF_EXPECTS( - column_order.empty() or static_cast(t.num_columns()) == column_order.size(), + column_order.empty() or static_cast(haystack.num_columns()) == column_order.size(), "Mismatch between number of columns and column order."); - CUDF_EXPECTS( - null_precedence.empty() or static_cast(t.num_columns()) == null_precedence.size(), - "Mismatch between number of columns and null precedence."); + CUDF_EXPECTS(null_precedence.empty() or + static_cast(haystack.num_columns()) == null_precedence.size(), + "Mismatch between number of columns and null precedence."); // Allocate result column auto result = make_numeric_column( - data_type{type_to_id()}, values.num_rows(), mask_state::UNALLOCATED, stream, mr); - auto const result_out = result->mutable_view().data(); + data_type{type_to_id()}, needles.num_rows(), mask_state::UNALLOCATED, stream, mr); + auto const out_it = result->mutable_view().data(); // Handle empty inputs - if (t.num_rows() == 0) { + if (haystack.num_rows() == 0) { CUDF_CUDA_TRY( - cudaMemsetAsync(result_out, 0, values.num_rows() * sizeof(size_type), stream.value())); + cudaMemsetAsync(out_it, 0, needles.num_rows() * sizeof(size_type), stream.value())); return result; } // This utility will ensure all corresponding dictionary columns have matching keys. // It will return any new dictionary columns created as well as updated table_views. - auto const matched = dictionary::detail::match_dictionaries({t, values}, stream); + auto const matched = dictionary::detail::match_dictionaries({haystack, needles}, stream); // Prepare to flatten the structs column - auto const has_null_elements = has_nested_nulls(t) or has_nested_nulls(values); + auto const has_null_elements = has_nested_nulls(haystack) or has_nested_nulls(needles); auto const flatten_nullability = has_null_elements ? structs::detail::column_nullability::FORCE : structs::detail::column_nullability::MATCH_INCOMING; @@ -135,37 +104,50 @@ std::unique_ptr search_ordered(table_view const& t, rhs, column_order_dv.data(), null_precedence_dv.data()); - launch_search( - count_it, count_it, t.num_rows(), values.num_rows(), result_out, comp, find_first, stream); + + auto const do_search = [find_first](auto&&... args) { + if (find_first) { + thrust::lower_bound(std::forward(args)...); + } else { + thrust::upper_bound(std::forward(args)...); + } + }; + do_search(rmm::exec_policy(stream), + count_it, + count_it + haystack.num_rows(), + count_it, + count_it + needles.num_rows(), + out_it, + comp); return result; } struct contains_scalar_dispatch { template - bool operator()(column_view const& col, scalar const& value, rmm::cuda_stream_view stream) + bool operator()(column_view const& haystack, scalar const& needle, rmm::cuda_stream_view stream) { - CUDF_EXPECTS(col.type() == value.type(), "scalar and column types must match"); + CUDF_EXPECTS(haystack.type() == needle.type(), "scalar and column types must match"); using Type = device_storage_type_t; using ScalarType = cudf::scalar_type_t; - auto d_col = column_device_view::create(col, stream); - auto s = static_cast(&value); + auto d_haystack = column_device_view::create(haystack, stream); + auto s = static_cast(&needle); - if (col.has_nulls()) { + if (haystack.has_nulls()) { auto found_iter = thrust::find(rmm::exec_policy(stream), - d_col->pair_begin(), - d_col->pair_end(), + d_haystack->pair_begin(), + d_haystack->pair_end(), thrust::make_pair(s->value(stream), true)); - return found_iter != d_col->pair_end(); + return found_iter != d_haystack->pair_end(); } else { auto found_iter = thrust::find(rmm::exec_policy(stream), // - d_col->begin(), - d_col->end(), + d_haystack->begin(), + d_haystack->end(), s->value(stream)); - return found_iter != d_col->end(); + return found_iter != d_haystack->end(); } } }; @@ -179,66 +161,69 @@ bool contains_scalar_dispatch::operator()(column_view const&, } template <> -bool contains_scalar_dispatch::operator()(column_view const& col, - scalar const& value, +bool contains_scalar_dispatch::operator()(column_view const& haystack, + scalar const& needle, rmm::cuda_stream_view stream) { - CUDF_EXPECTS(col.type() == value.type(), "scalar and column types must match"); + CUDF_EXPECTS(haystack.type() == needle.type(), "scalar and column types must match"); - auto const scalar_table = static_cast(&value)->view(); - CUDF_EXPECTS(col.num_children() == scalar_table.num_columns(), + auto const scalar_table = static_cast(&needle)->view(); + CUDF_EXPECTS(haystack.num_children() == scalar_table.num_columns(), "struct scalar and structs column must have the same number of children"); - for (size_type i = 0; i < col.num_children(); ++i) { - CUDF_EXPECTS(col.child(i).type() == scalar_table.column(i).type(), + for (size_type i = 0; i < haystack.num_children(); ++i) { + CUDF_EXPECTS(haystack.child(i).type() == scalar_table.column(i).type(), "scalar and column children types must match"); } // Prepare to flatten the structs column and scalar. - auto const has_null_elements = - has_nested_nulls(table_view{std::vector{col.child_begin(), col.child_end()}}) || - has_nested_nulls(scalar_table); + auto const has_null_elements = has_nested_nulls(table_view{std::vector{ + haystack.child_begin(), haystack.child_end()}}) || + has_nested_nulls(scalar_table); auto const flatten_nullability = has_null_elements ? structs::detail::column_nullability::FORCE : structs::detail::column_nullability::MATCH_INCOMING; // Flatten the input structs column, only materialize the bitmask if there is null in the input. - auto const col_flattened = - structs::detail::flatten_nested_columns(table_view{{col}}, {}, {}, flatten_nullability); - auto const val_flattened = + auto const haystack_flattened = + structs::detail::flatten_nested_columns(table_view{{haystack}}, {}, {}, flatten_nullability); + auto const needle_flattened = structs::detail::flatten_nested_columns(scalar_table, {}, {}, flatten_nullability); // The struct scalar only contains the struct member columns. // Thus, if there is any null in the input, we must exclude the first column in the flattened // table of the input column from searching because that column is the materialized bitmask of // the input structs column. - auto const col_flattened_content = col_flattened.flattened_columns(); - auto const col_flattened_children = table_view{std::vector{ - col_flattened_content.begin() + static_cast(has_null_elements), - col_flattened_content.end()}}; + auto const haystack_flattened_content = haystack_flattened.flattened_columns(); + auto const haystack_flattened_children = table_view{std::vector{ + haystack_flattened_content.begin() + static_cast(has_null_elements), + haystack_flattened_content.end()}}; - auto const d_col_children_ptr = table_device_view::create(col_flattened_children, stream); - auto const d_val_ptr = table_device_view::create(val_flattened, stream); + auto const d_haystack_children_ptr = + table_device_view::create(haystack_flattened_children, stream); + auto const d_needle_ptr = table_device_view::create(needle_flattened, stream); auto const start_iter = thrust::make_counting_iterator(0); - auto const end_iter = start_iter + col.size(); - auto const comp = row_equality_comparator( - nullate::DYNAMIC{has_null_elements}, *d_col_children_ptr, *d_val_ptr, null_equality::EQUAL); + auto const end_iter = start_iter + haystack.size(); + auto const comp = row_equality_comparator(nullate::DYNAMIC{has_null_elements}, + *d_haystack_children_ptr, + *d_needle_ptr, + null_equality::EQUAL); auto const found_iter = thrust::find_if( rmm::exec_policy(stream), start_iter, end_iter, [comp] __device__(auto const idx) { - return comp(idx, 0); // compare col[idx] == val[0]. + return comp(idx, 0); // compare haystack[idx] == val[0]. }); return found_iter != end_iter; } template <> -bool contains_scalar_dispatch::operator()(column_view const& col, - scalar const& value, +bool contains_scalar_dispatch::operator()(column_view const& haystack, + scalar const& needle, rmm::cuda_stream_view stream) { - auto dict_col = cudf::dictionary_column_view(col); - // first, find the value in the dictionary's key set - auto index = cudf::dictionary::detail::get_index(dict_col, value, stream); + auto dict_col = cudf::dictionary_column_view(haystack); + // first, find the needle in the dictionary's key set + auto index = cudf::dictionary::detail::get_index(dict_col, needle, stream); // if found, check the index is actually in the indices column return index->is_valid(stream) ? cudf::type_dispatcher(dict_col.indices().type(), contains_scalar_dispatch{}, @@ -251,12 +236,13 @@ bool contains_scalar_dispatch::operator()(column_view const& } // namespace namespace detail { -bool contains(column_view const& col, scalar const& value, rmm::cuda_stream_view stream) +bool contains(column_view const& haystack, scalar const& needle, rmm::cuda_stream_view stream) { - if (col.is_empty()) { return false; } - if (not value.is_valid(stream)) { return col.has_nulls(); } + if (haystack.is_empty()) { return false; } + if (not needle.is_valid(stream)) { return haystack.has_nulls(); } - return cudf::type_dispatcher(col.type(), contains_scalar_dispatch{}, col, value, stream); + return cudf::type_dispatcher( + haystack.type(), contains_scalar_dispatch{}, haystack, needle, stream); } struct multi_contains_dispatch { @@ -267,44 +253,44 @@ struct multi_contains_dispatch { rmm::mr::device_memory_resource* mr) { std::unique_ptr result = make_numeric_column(data_type{type_to_id()}, - haystack.size(), - copy_bitmask(haystack), - haystack.null_count(), + needles.size(), + copy_bitmask(needles), + needles.null_count(), stream, mr); - if (haystack.is_empty()) { return result; } + if (needles.is_empty()) { return result; } mutable_column_view result_view = result.get()->mutable_view(); - if (needles.is_empty()) { + if (haystack.is_empty()) { thrust::fill( rmm::exec_policy(stream), result_view.begin(), result_view.end(), false); return result; } - auto hash_set = cudf::detail::unordered_multiset::create(needles, stream); + auto hash_set = cudf::detail::unordered_multiset::create(haystack, stream); auto device_hash_set = hash_set.to_device(); - auto d_haystack_ptr = column_device_view::create(haystack, stream); - auto d_haystack = *d_haystack_ptr; + auto d_needles_ptr = column_device_view::create(needles, stream); + auto d_needles = *d_needles_ptr; - if (haystack.has_nulls()) { + if (needles.has_nulls()) { thrust::transform(rmm::exec_policy(stream), thrust::make_counting_iterator(0), - thrust::make_counting_iterator(haystack.size()), + thrust::make_counting_iterator(needles.size()), result_view.begin(), - [device_hash_set, d_haystack] __device__(size_t index) { - return d_haystack.is_null_nocheck(index) || - device_hash_set.contains(d_haystack.element(index)); + [device_hash_set, d_needles] __device__(size_t index) { + return d_needles.is_null_nocheck(index) || + device_hash_set.contains(d_needles.element(index)); }); } else { thrust::transform(rmm::exec_policy(stream), thrust::make_counting_iterator(0), - thrust::make_counting_iterator(haystack.size()), + thrust::make_counting_iterator(needles.size()), result_view.begin(), - [device_hash_set, d_haystack] __device__(size_t index) { - return device_hash_set.contains(d_haystack.element(index)); + [device_hash_set, d_needles] __device__(size_t index) { + return device_hash_set.contains(d_needles.element(index)); }); } @@ -336,10 +322,10 @@ std::unique_ptr multi_contains_dispatch::operator()( dictionary_column_view const haystack(haystack_in); dictionary_column_view const needles(needles_in); // first combine keys so both dictionaries have the same set - auto haystack_matched = dictionary::detail::add_keys(haystack, needles.keys(), stream); - auto const haystack_view = dictionary_column_view(haystack_matched->view()); - auto needles_matched = dictionary::detail::set_keys(needles, haystack_view.keys(), stream); + auto needles_matched = dictionary::detail::add_keys(needles, haystack.keys(), stream); auto const needles_view = dictionary_column_view(needles_matched->view()); + auto haystack_matched = dictionary::detail::set_keys(haystack, needles_view.keys(), stream); + auto const haystack_view = dictionary_column_view(haystack_matched->view()); // now just use the indices for the contains column_view const haystack_indices = haystack_view.get_indices_annotated(); @@ -363,56 +349,56 @@ std::unique_ptr contains(column_view const& haystack, haystack.type(), multi_contains_dispatch{}, haystack, needles, stream, mr); } -std::unique_ptr lower_bound(table_view const& t, - table_view const& values, +std::unique_ptr lower_bound(table_view const& haystack, + table_view const& needles, std::vector const& column_order, std::vector const& null_precedence, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - return search_ordered(t, values, true, column_order, null_precedence, stream, mr); + return search_ordered(haystack, needles, true, column_order, null_precedence, stream, mr); } -std::unique_ptr upper_bound(table_view const& t, - table_view const& values, +std::unique_ptr upper_bound(table_view const& haystack, + table_view const& needles, std::vector const& column_order, std::vector const& null_precedence, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - return search_ordered(t, values, false, column_order, null_precedence, stream, mr); + return search_ordered(haystack, needles, false, column_order, null_precedence, stream, mr); } } // namespace detail // external APIs -std::unique_ptr lower_bound(table_view const& t, - table_view const& values, +std::unique_ptr lower_bound(table_view const& haystack, + table_view const& needles, std::vector const& column_order, std::vector const& null_precedence, rmm::mr::device_memory_resource* mr) { CUDF_FUNC_RANGE(); return detail::lower_bound( - t, values, column_order, null_precedence, rmm::cuda_stream_default, mr); + haystack, needles, column_order, null_precedence, rmm::cuda_stream_default, mr); } -std::unique_ptr upper_bound(table_view const& t, - table_view const& values, +std::unique_ptr upper_bound(table_view const& haystack, + table_view const& needles, std::vector const& column_order, std::vector const& null_precedence, rmm::mr::device_memory_resource* mr) { CUDF_FUNC_RANGE(); return detail::upper_bound( - t, values, column_order, null_precedence, rmm::cuda_stream_default, mr); + haystack, needles, column_order, null_precedence, rmm::cuda_stream_default, mr); } -bool contains(column_view const& col, scalar const& value) +bool contains(column_view const& haystack, scalar const& needle) { CUDF_FUNC_RANGE(); - return detail::contains(col, value, rmm::cuda_stream_default); + return detail::contains(haystack, needle, rmm::cuda_stream_default); } std::unique_ptr contains(column_view const& haystack, diff --git a/cpp/tests/search/search_dictionary_test.cpp b/cpp/tests/search/search_dictionary_test.cpp index 6b1caa5ed6f..9eba259ee39 100644 --- a/cpp/tests/search/search_dictionary_test.cpp +++ b/cpp/tests/search/search_dictionary_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -88,7 +88,7 @@ TEST_F(DictionarySearchTest, contains_dictionary) EXPECT_FALSE(cudf::contains(column, string_scalar{"28"})); cudf::test::dictionary_column_wrapper needles({"00", "17", "23", "27"}); - fixed_width_column_wrapper expect{1, 1, 1, 1, 1, 1, 0}; + fixed_width_column_wrapper expect{1, 1, 1, 0}; auto result = cudf::contains(column, needles); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, expect); } @@ -101,7 +101,7 @@ TEST_F(DictionarySearchTest, contains_nullable_dictionary) EXPECT_FALSE(cudf::contains(column, numeric_scalar{28})); cudf::test::dictionary_column_wrapper needles({0, 17, 23, 27}); - fixed_width_column_wrapper expect({1, 0, 1, 1, 1, 1, 0}, {1, 0, 1, 1, 1, 1, 1}); + fixed_width_column_wrapper expect{1, 1, 1, 0}; auto result = cudf::contains(column, needles); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, expect); } diff --git a/cpp/tests/search/search_test.cpp b/cpp/tests/search/search_test.cpp index 0a2533cd5f3..169eaffa41a 100644 --- a/cpp/tests/search/search_test.cpp +++ b/cpp/tests/search/search_test.cpp @@ -1627,7 +1627,7 @@ TEST_F(SearchTest, multi_contains_some) fixed_width_column_wrapper haystack{0, 1, 17, 19, 23, 29, 71}; fixed_width_column_wrapper needles{17, 19, 45, 72}; - fixed_width_column_wrapper expect{0, 0, 1, 1, 0, 0, 0}; + fixed_width_column_wrapper expect{1, 1, 0, 0}; auto result = cudf::contains(haystack, needles); @@ -1641,7 +1641,7 @@ TEST_F(SearchTest, multi_contains_none) fixed_width_column_wrapper haystack{0, 1, 17, 19, 23, 29, 71}; fixed_width_column_wrapper needles{2, 3}; - fixed_width_column_wrapper expect{0, 0, 0, 0, 0, 0, 0}; + fixed_width_column_wrapper expect{0, 0}; auto result = cudf::contains(haystack, needles); @@ -1657,7 +1657,7 @@ TEST_F(SearchTest, multi_contains_some_string) cudf::test::strings_column_wrapper needles(h_needles_strings.begin(), h_needles_strings.end()); - fixed_width_column_wrapper expect{0, 0, 1, 1, 0, 0, 0}; + fixed_width_column_wrapper expect{1, 1, 0, 0}; auto result = cudf::contains(haystack, needles); @@ -1673,7 +1673,7 @@ TEST_F(SearchTest, multi_contains_none_string) cudf::test::strings_column_wrapper needles(h_needles_strings.begin(), h_needles_strings.end()); - fixed_width_column_wrapper expect{0, 0, 0, 0, 0, 0, 0}; + fixed_width_column_wrapper expect{0, 0}; auto result = cudf::contains(haystack, needles); @@ -1688,7 +1688,7 @@ TEST_F(SearchTest, multi_contains_some_with_nulls) {1, 1, 0, 1, 1, 1, 1}}; fixed_width_column_wrapper needles{{17, 19, 23, 72}, {1, 0, 1, 1}}; - fixed_width_column_wrapper expect{{0, 0, 0, 0, 1, 0, 0}, {1, 1, 0, 1, 1, 1, 1}}; + fixed_width_column_wrapper expect{{0, 0, 1, 0}, {1, 0, 1, 1}}; auto result = cudf::contains(haystack, needles); @@ -1703,7 +1703,7 @@ TEST_F(SearchTest, multi_contains_none_with_nulls) {1, 1, 0, 1, 1, 1, 1}}; fixed_width_column_wrapper needles{{17, 19, 24, 72}, {1, 0, 1, 1}}; - fixed_width_column_wrapper expect{{0, 0, 0, 0, 0, 0, 0}, {1, 1, 0, 1, 1, 1, 1}}; + fixed_width_column_wrapper expect{{0, 0, 0, 0}, {1, 0, 1, 1}}; auto result = cudf::contains(haystack, needles); @@ -1715,7 +1715,7 @@ TEST_F(SearchTest, multi_contains_some_string_with_nulls) std::vector h_haystack_strings{"0", "1", nullptr, "19", "23", "29", "71"}; std::vector h_needles_strings{"17", "23", nullptr, "72"}; - fixed_width_column_wrapper expect{{0, 0, 0, 0, 1, 0, 0}, {1, 1, 0, 1, 1, 1, 1}}; + fixed_width_column_wrapper expect{{0, 1, 0, 0}, {1, 1, 0, 1}}; cudf::test::strings_column_wrapper haystack( h_haystack_strings.begin(), @@ -1739,7 +1739,7 @@ TEST_F(SearchTest, multi_contains_none_string_with_nulls) std::vector h_haystack_strings{"0", "1", nullptr, "19", "23", "29", "71"}; std::vector h_needles_strings{"2", nullptr}; - fixed_width_column_wrapper expect{{0, 0, 0, 0, 0, 0, 0}, {1, 1, 0, 1, 1, 1, 1}}; + fixed_width_column_wrapper expect{{0, 0}, {1, 0}}; cudf::test::strings_column_wrapper haystack( h_haystack_strings.begin(), @@ -1765,7 +1765,7 @@ TEST_F(SearchTest, multi_contains_empty_column) fixed_width_column_wrapper haystack{}; fixed_width_column_wrapper needles{2, 3}; - fixed_width_column_wrapper expect{}; + fixed_width_column_wrapper expect{0, 0}; auto result = cudf::contains(haystack, needles); @@ -1781,7 +1781,7 @@ TEST_F(SearchTest, multi_contains_empty_column_string) cudf::test::strings_column_wrapper needles(h_needles_strings.begin(), h_needles_strings.end()); - fixed_width_column_wrapper expect{}; + fixed_width_column_wrapper expect{0, 0, 0, 0}; auto result = cudf::contains(haystack, needles); @@ -1795,7 +1795,7 @@ TEST_F(SearchTest, multi_contains_empty_input_set) fixed_width_column_wrapper haystack{0, 1, 17, 19, 23, 29, 71}; fixed_width_column_wrapper needles{}; - fixed_width_column_wrapper expect{0, 0, 0, 0, 0, 0, 0}; + fixed_width_column_wrapper expect{}; auto result = cudf::contains(haystack, needles); @@ -1811,7 +1811,7 @@ TEST_F(SearchTest, multi_contains_empty_input_set_string) cudf::test::strings_column_wrapper needles(h_needles_strings.begin(), h_needles_strings.end()); - fixed_width_column_wrapper expect{0, 0, 0, 0, 0, 0, 0}; + fixed_width_column_wrapper expect{}; auto result = cudf::contains(haystack, needles); diff --git a/docs/cudf/source/api_docs/io.rst b/docs/cudf/source/api_docs/io.rst index 7e4d1b48c93..a52667cd3e4 100644 --- a/docs/cudf/source/api_docs/io.rst +++ b/docs/cudf/source/api_docs/io.rst @@ -36,6 +36,10 @@ Parquet read_parquet DataFrame.to_parquet cudf.io.parquet.read_parquet_metadata + :template: autosummary/class_with_autosummary.rst + + cudf.io.parquet.ParquetDatasetWriter + ORC ~~~ diff --git a/java/src/main/java/ai/rapids/cudf/ColumnView.java b/java/src/main/java/ai/rapids/cudf/ColumnView.java index e871da18966..9f07b130a83 100644 --- a/java/src/main/java/ai/rapids/cudf/ColumnView.java +++ b/java/src/main/java/ai/rapids/cudf/ColumnView.java @@ -1769,22 +1769,23 @@ public boolean contains(Scalar needle) { } /** - * Returns a new ColumnVector of {@link DType#BOOL8} elements containing true if the corresponding - * entry in haystack is contained in needles and false if it is not. The caller will be responsible - * for the lifecycle of the new vector. + * Returns a new column of {@link DType#BOOL8} elements having the same size as this column, + * each row value is true if the corresponding entry in this column is contained in the + * given searchSpace column and false if it is not. + * The caller will be responsible for the lifecycle of the new vector. * * example: * - * haystack = { 10, 20, 30, 40, 50 } - * needles = { 20, 40, 60, 80 } + * col = { 10, 20, 30, 40, 50 } + * searchSpace = { 20, 40, 60, 80 } * * result = { false, true, false, true, false } * - * @param needles + * @param searchSpace * @return A new ColumnVector of type {@link DType#BOOL8} */ - public final ColumnVector contains(ColumnView needles) { - return new ColumnVector(containsVector(getNativeView(), needles.getNativeView())); + public final ColumnVector contains(ColumnView searchSpace) { + return new ColumnVector(containsVector(getNativeView(), searchSpace.getNativeView())); } /** @@ -4080,7 +4081,7 @@ private static native long segmentedGather(long sourceColumnHandle, long gatherM private static native boolean containsScalar(long columnViewHaystack, long scalarHandle) throws CudfException; - private static native long containsVector(long columnViewHaystack, long columnViewNeedles) throws CudfException; + private static native long containsVector(long valuesHandle, long searchSpaceHandle) throws CudfException; private static native long transform(long viewHandle, String udf, boolean isPtx); diff --git a/java/src/main/native/src/ColumnViewJni.cpp b/java/src/main/native/src/ColumnViewJni.cpp index e074180c312..b33769bdc1b 100644 --- a/java/src/main/native/src/ColumnViewJni.cpp +++ b/java/src/main/native/src/ColumnViewJni.cpp @@ -1166,15 +1166,18 @@ JNIEXPORT jboolean JNICALL Java_ai_rapids_cudf_ColumnView_containsScalar(JNIEnv } JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ColumnView_containsVector(JNIEnv *env, jobject j_object, - jlong j_haystack_handle, - jlong j_needle_handle) { - JNI_NULL_CHECK(env, j_haystack_handle, "haystack vector is null", false); - JNI_NULL_CHECK(env, j_needle_handle, "needle vector is null", false); + jlong j_values_handle, + jlong j_search_space_handle) { + JNI_NULL_CHECK(env, j_values_handle, "values vector is null", false); + JNI_NULL_CHECK(env, j_search_space_handle, "search_space vector is null", false); try { cudf::jni::auto_set_device(env); - cudf::column_view *haystack = reinterpret_cast(j_haystack_handle); - cudf::column_view *needle = reinterpret_cast(j_needle_handle); - return release_as_jlong(cudf::contains(*haystack, *needle)); + auto const search_space_ptr = + reinterpret_cast(j_search_space_handle); + auto const values_ptr = reinterpret_cast(j_values_handle); + + // The C++ API `cudf::contains` requires that the search space is the first parameter. + return release_as_jlong(cudf::contains(*search_space_ptr, *values_ptr)); } CATCH_STD(env, 0); } diff --git a/python/cudf/cudf/_lib/cpp/search.pxd b/python/cudf/cudf/_lib/cpp/search.pxd index 4df73881ea5..8baef0aa1b9 100644 --- a/python/cudf/cudf/_lib/cpp/search.pxd +++ b/python/cudf/cudf/_lib/cpp/search.pxd @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. from libcpp.memory cimport unique_ptr from libcpp.vector cimport vector @@ -12,15 +12,15 @@ from cudf._lib.cpp.table.table_view cimport table_view cdef extern from "cudf/search.hpp" namespace "cudf" nogil: cdef unique_ptr[column] lower_bound( - table_view t, - table_view values, + table_view haystack, + table_view needles, vector[libcudf_types.order] column_order, vector[libcudf_types.null_order] null_precedence, ) except + cdef unique_ptr[column] upper_bound( - table_view t, - table_view values, + table_view haystack, + table_view needles, vector[libcudf_types.order] column_order, vector[libcudf_types.null_order] null_precedence, ) except + diff --git a/python/cudf/cudf/_lib/scalar.pyx b/python/cudf/cudf/_lib/scalar.pyx index 71ac022ba2d..6309720706b 100644 --- a/python/cudf/cudf/_lib/scalar.pyx +++ b/python/cudf/cudf/_lib/scalar.pyx @@ -27,6 +27,7 @@ from cudf._lib.types import ( duration_unit_map, ) from cudf.core.dtypes import ListDtype, StructDtype +from cudf.core.missing import NA from cudf._lib.column cimport Column from cudf._lib.cpp.column.column_view cimport column_view @@ -170,7 +171,7 @@ cdef class DeviceScalar: return self.get_raw_ptr()[0].is_valid() def __repr__(self): - if self.value is cudf.NA: + if self.value is NA: return ( f"{self.__class__.__name__}" f"({self.value}, {repr(self.dtype)})" @@ -356,7 +357,7 @@ cdef _set_struct_from_pydict(unique_ptr[scalar]& s, else: pyarrow_table = pa.Table.from_arrays( [ - pa.array([cudf.NA], from_pandas=True, type=f.type) + pa.array([NA], from_pandas=True, type=f.type) for f in arrow_schema ], names=columns @@ -371,7 +372,7 @@ cdef _set_struct_from_pydict(unique_ptr[scalar]& s, cdef _get_py_dict_from_struct(unique_ptr[scalar]& s): if not s.get()[0].is_valid(): - return cudf.NA + return NA cdef table_view struct_table_view = (s.get()).view() column_names = [str(i) for i in range(struct_table_view.num_columns())] @@ -386,7 +387,7 @@ cdef _set_list_from_pylist(unique_ptr[scalar]& s, object dtype, bool valid=True): - value = value if valid else [cudf.NA] + value = value if valid else [NA] cdef Column col if isinstance(dtype.element_type, ListDtype): pa_type = dtype.element_type.to_arrow() @@ -404,7 +405,7 @@ cdef _set_list_from_pylist(unique_ptr[scalar]& s, cdef _get_py_list_from_list(unique_ptr[scalar]& s): if not s.get()[0].is_valid(): - return cudf.NA + return NA cdef column_view list_col_view = (s.get()).view() cdef Column list_col = Column.from_column_view(list_col_view, None) @@ -416,14 +417,14 @@ cdef _get_py_list_from_list(unique_ptr[scalar]& s): cdef _get_py_string_from_string(unique_ptr[scalar]& s): if not s.get()[0].is_valid(): - return cudf.NA + return NA return (s.get())[0].to_string().decode() cdef _get_np_scalar_from_numeric(unique_ptr[scalar]& s): cdef scalar* s_ptr = s.get() if not s_ptr[0].is_valid(): - return cudf.NA + return NA cdef libcudf_types.data_type cdtype = s_ptr[0].type() @@ -456,7 +457,7 @@ cdef _get_np_scalar_from_numeric(unique_ptr[scalar]& s): cdef _get_py_decimal_from_fixed_point(unique_ptr[scalar]& s): cdef scalar* s_ptr = s.get() if not s_ptr[0].is_valid(): - return cudf.NA + return NA cdef libcudf_types.data_type cdtype = s_ptr[0].type() @@ -480,7 +481,7 @@ cdef _get_np_scalar_from_timestamp64(unique_ptr[scalar]& s): cdef scalar* s_ptr = s.get() if not s_ptr[0].is_valid(): - return cudf.NA + return NA cdef libcudf_types.data_type cdtype = s_ptr[0].type() @@ -571,7 +572,7 @@ def as_device_scalar(val, dtype=None): def _is_null_host_scalar(slr): - if slr is None or slr is cudf.NA: + if slr is None or slr is NA: return True elif isinstance(slr, (np.datetime64, np.timedelta64)) and np.isnat(slr): return True @@ -603,5 +604,5 @@ def _nested_na_replace(input_list): if isinstance(value, list): _nested_na_replace(value) elif value is None: - input_list[idx] = cudf.NA + input_list[idx] = NA return input_list diff --git a/python/cudf/cudf/core/_internals/where.py b/python/cudf/cudf/core/_internals/where.py index 59e7d629092..bc01752a2b4 100644 --- a/python/cudf/cudf/core/_internals/where.py +++ b/python/cudf/cudf/core/_internals/where.py @@ -12,6 +12,7 @@ from cudf.core.dataframe import DataFrame from cudf.core.frame import Frame from cudf.core.index import Index +from cudf.core.missing import NA from cudf.core.series import Series from cudf.core.single_column_frame import SingleColumnFrame @@ -28,9 +29,7 @@ def _normalize_scalars(col: ColumnBase, other: ScalarLike) -> ScalarLike: f"{type(other).__name__} to {col.dtype.name}" ) - return cudf.Scalar( - other, dtype=col.dtype if other in {None, cudf.NA} else None - ) + return cudf.Scalar(other, dtype=col.dtype if other in {None, NA} else None) def _check_and_cast_columns_with_other( diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index e1d91e6d0c0..47a2e3489e8 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -68,6 +68,7 @@ ListDtype, StructDtype, ) +from cudf.core.missing import NA from cudf.core.mixins import BinaryOperand, Reducible from cudf.utils.dtypes import ( cudf_dtype_from_pa_type, @@ -499,7 +500,7 @@ def __setitem__(self, key: Any, value: Any): self._mimic_inplace(out, inplace=True) def _wrap_binop_normalization(self, other): - if other is cudf.NA or other is None: + if other is NA or other is None: return cudf.Scalar(other, dtype=self.dtype) if isinstance(other, np.ndarray) and other.ndim == 0: other = other.item() diff --git a/python/cudf/cudf/core/column/lists.py b/python/cudf/cudf/core/column/lists.py index 30e418f0825..e8a5638f07a 100644 --- a/python/cudf/cudf/core/column/lists.py +++ b/python/cudf/cudf/core/column/lists.py @@ -30,6 +30,7 @@ from cudf.core.column import ColumnBase, as_column, column from cudf.core.column.methods import ColumnMethods, ParentType from cudf.core.dtypes import ListDtype +from cudf.core.missing import NA class ListColumn(ColumnBase): @@ -91,7 +92,7 @@ def __setitem__(self, key, value): if isinstance(value, cudf.Scalar): if value.dtype != self.dtype: raise TypeError("list nesting level mismatch") - elif value is cudf.NA: + elif value is NA: value = cudf.Scalar(value, dtype=self.dtype) else: raise ValueError(f"Can not set {value} into ListColumn") @@ -354,7 +355,7 @@ def get( index = as_column(index) out = extract_element_column(self._column, as_column(index)) - if not (default is None or default is cudf.NA): + if not (default is None or default is NA): # determine rows for which `index` is out-of-bounds lengths = count_elements(self._column) out_of_bounds_mask = (np.negative(index) > lengths) | ( diff --git a/python/cudf/cudf/core/column/numerical_base.py b/python/cudf/cudf/core/column/numerical_base.py index 659bb58d790..bb7711a3ead 100644 --- a/python/cudf/cudf/core/column/numerical_base.py +++ b/python/cudf/cudf/core/column/numerical_base.py @@ -11,6 +11,7 @@ from cudf import _lib as libcudf from cudf._typing import ScalarLike from cudf.core.column import ColumnBase +from cudf.core.missing import NA from cudf.core.mixins import Scannable @@ -116,7 +117,7 @@ def quantile( scalar_result = result.element_indexing(0) return ( cudf.utils.dtypes._get_nan_for_dtype(self.dtype) - if scalar_result is cudf.NA + if scalar_result is NA else scalar_result ) return result diff --git a/python/cudf/cudf/core/column/string.py b/python/cudf/cudf/core/column/string.py index 70097f15372..09a4754f519 100644 --- a/python/cudf/cudf/core/column/string.py +++ b/python/cudf/cudf/core/column/string.py @@ -5407,7 +5407,7 @@ def fillna( def _find_first_and_last(self, value: ScalarLike) -> Tuple[int, int]: found_indices = libcudf.search.contains( - self, column.as_column([value], dtype=self.dtype) + column.as_column([value], dtype=self.dtype), self ) found_indices = libcudf.unary.cast(found_indices, dtype=np.int32) first = column.as_column(found_indices).find_first_value(np.int32(1)) diff --git a/python/cudf/cudf/core/column/struct.py b/python/cudf/cudf/core/column/struct.py index ed5e1c9450d..fa834ae8a5a 100644 --- a/python/cudf/cudf/core/column/struct.py +++ b/python/cudf/cudf/core/column/struct.py @@ -10,6 +10,7 @@ from cudf.core.column import ColumnBase, build_struct_column from cudf.core.column.methods import ColumnMethods from cudf.core.dtypes import StructDtype +from cudf.core.missing import NA class StructColumn(ColumnBase): @@ -102,7 +103,7 @@ def __setitem__(self, key, value): if isinstance(value, dict): # filling in fields not in dict for field in self.dtype.fields: - value[field] = value.get(field, cudf.NA) + value[field] = value.get(field, NA) value = cudf.Scalar(value, self.dtype) super().__setitem__(key, value) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index a3e2f40b28e..0c3dc82719e 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -76,6 +76,7 @@ _indices_from_labels, doc_reset_index_template, ) +from cudf.core.missing import NA from cudf.core.multiindex import MultiIndex from cudf.core.resample import DataFrameResampler from cudf.core.series import Series @@ -364,9 +365,7 @@ def _setitem_tuple_arg(self, key, value): scatter_map = _indices_from_labels(self._frame, key[0]) for col in columns_df._column_names: columns_df[col][scatter_map] = ( - value._data[col] - if col in value_column_names - else cudf.NA + value._data[col] if col in value_column_names else NA ) else: @@ -479,7 +478,7 @@ def _setitem_tuple_arg(self, key, value): value_column_names = set(value._column_names) for col in columns_df._column_names: columns_df[col][key[0]] = ( - value._data[col] if col in value_column_names else cudf.NA + value._data[col] if col in value_column_names else NA ) else: @@ -3867,8 +3866,8 @@ def applymap( # bytecode to generate the equivalent PTX # as a null-ignoring version of the function def _func(x): # pragma: no cover - if x is cudf.NA: - return cudf.NA + if x is NA: + return NA else: return devfunc(x) diff --git a/python/cudf/cudf/io/__init__.py b/python/cudf/cudf/io/__init__.py index 15404b26042..4ec84ecbc74 100644 --- a/python/cudf/cudf/io/__init__.py +++ b/python/cudf/cudf/io/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018, NVIDIA CORPORATION. +# Copyright (c) 2018-2022, NVIDIA CORPORATION. from cudf.io.avro import read_avro from cudf.io.csv import read_csv, to_csv from cudf.io.dlpack import from_dlpack @@ -9,6 +9,7 @@ from cudf.io.parquet import ( merge_parquet_filemetadata, read_parquet, + ParquetDatasetWriter, read_parquet_metadata, write_to_dataset, ) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index d7e85d72ba0..a9398a3139f 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -1,6 +1,8 @@ # Copyright (c) 2019-2022, NVIDIA CORPORATION. import math +import shutil +import tempfile import warnings from collections import defaultdict from contextlib import ExitStack @@ -232,12 +234,15 @@ def _process_dataset( filters = pq._filters_to_expression(filters) # Initialize ds.FilesystemDataset + # TODO: Remove the if len(paths) workaround after following bug is fixed: + # https://issues.apache.org/jira/browse/ARROW-16438 dataset = ds.dataset( - paths, + source=paths[0] if len(paths) == 1 else paths, filesystem=fs, format="parquet", partitioning="hive", ) + file_list = dataset.files if len(file_list) == 0: raise FileNotFoundError(f"{paths} could not be resolved to any files") @@ -837,6 +842,67 @@ def _parse_bytes(s): class ParquetDatasetWriter: + """ + Write a parquet file or dataset incrementally + + Parameters + ---------- + path : str + A local directory path or S3 URL. Will be used as root directory + path while writing a partitioned dataset. + partition_cols : list + Column names by which to partition the dataset + Columns are partitioned in the order they are given + index : bool, default None + If ``True``, include the dataframe's index(es) in the file output. + If ``False``, they will not be written to the file. If ``None``, + index(es) other than RangeIndex will be saved as columns. + compression : {'snappy', None}, default 'snappy' + Name of the compression to use. Use ``None`` for no compression. + statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP' + Level at which column statistics should be included in file. + max_file_size : int or str, default None + A file size that cannot be exceeded by the writer. + It is in bytes, if the input is int. + Size can also be a str in form or "10 MB", "1 GB", etc. + If this parameter is used, it is mandatory to pass + `file_name_prefix`. + file_name_prefix : str + This is a prefix to file names generated only when + `max_file_size` is specified. + + + Examples + -------- + Using a context + + >>> df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]}) + >>> df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]}) + >>> with ParquetDatasetWriter("./dataset", partition_cols=["a"]) as cw: + ... cw.write_table(df1) + ... cw.write_table(df2) + + By manually calling ``close()`` + + >>> cw = ParquetDatasetWriter("./dataset", partition_cols=["a"]) + >>> cw.write_table(df1) + >>> cw.write_table(df2) + >>> cw.close() + + Both the methods will generate the same directory structure + + .. code-block:: none + + dataset/ + a=1 + .parquet + a=2 + .parquet + a=3 + .parquet + + """ + @_cudf_nvtx_annotate def __init__( self, @@ -847,68 +913,15 @@ def __init__( statistics="ROWGROUP", max_file_size=None, file_name_prefix=None, + **kwargs, ) -> None: - """ - Write a parquet file or dataset incrementally - - Parameters - ---------- - path : str - File path or Root Directory path. Will be used as Root Directory - path while writing a partitioned dataset. - partition_cols : list - Column names by which to partition the dataset - Columns are partitioned in the order they are given - index : bool, default None - If ``True``, include the dataframe’s index(es) in the file output. - If ``False``, they will not be written to the file. If ``None``, - index(es) other than RangeIndex will be saved as columns. - compression : {'snappy', None}, default 'snappy' - Name of the compression to use. Use ``None`` for no compression. - statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP' - Level at which column statistics should be included in file. - max_file_size : int or str, default None - A file size that cannot be exceeded by the writer. - It is in bytes, if the input is int. - Size can also be a str in form or "10 MB", "1 GB", etc. - If this parameter is used, it is mandatory to pass - `file_name_prefix`. - file_name_prefix : str - This is a prefix to file names generated only when - `max_file_size` is specified. - - - Examples - ________ - Using a context - - >>> df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]}) - >>> df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]}) - >>> with ParquetDatasetWriter("./dataset", partition_cols=["a"]) as cw: - ... cw.write_table(df1) - ... cw.write_table(df2) - - By manually calling ``close()`` - - >>> cw = ParquetDatasetWriter("./dataset", partition_cols=["a"]) - >>> cw.write_table(df1) - >>> cw.write_table(df2) - >>> cw.close() - - Both the methods will generate the same directory structure - - .. code-block:: bash - - dataset/ - a=1 - .parquet - a=2 - .parquet - a=3 - .parquet + if isinstance(path, str) and path.startswith("s3://"): + self.fs_meta = {"is_s3": True, "actual_path": path} + self.path = tempfile.TemporaryDirectory().name + else: + self.fs_meta = {} + self.path = path - """ - self.path = path self.common_args = { "index": index, "compression": compression, @@ -923,6 +936,7 @@ def __init__( # Map of partition_col values to their ParquetWriter's index # in self._chunked_writers for reverse lookup self.path_cw_map: Dict[str, int] = {} + self.kwargs = kwargs self.filename = file_name_prefix self.max_file_size = max_file_size if max_file_size is not None: @@ -1051,18 +1065,19 @@ def write_table(self, df): ] cw.write_table(grouped_df, this_cw_part_info) - # Create new cw for unhandled paths encountered in this write_table - new_paths, part_info, meta_paths = zip(*new_cw_paths) - self._chunked_writers.append( - ( - ParquetWriter(new_paths, **self.common_args), - new_paths, - meta_paths, + if new_cw_paths: + # Create new cw for unhandled paths encountered in this write_table + new_paths, part_info, meta_paths = zip(*new_cw_paths) + self._chunked_writers.append( + ( + ParquetWriter(new_paths, **self.common_args), + new_paths, + meta_paths, + ) ) - ) - new_cw_idx = len(self._chunked_writers) - 1 - self.path_cw_map.update({k: new_cw_idx for k in new_paths}) - self._chunked_writers[-1][0].write_table(grouped_df, part_info) + new_cw_idx = len(self._chunked_writers) - 1 + self.path_cw_map.update({k: new_cw_idx for k in new_paths}) + self._chunked_writers[-1][0].write_table(grouped_df, part_info) @_cudf_nvtx_annotate def close(self, return_metadata=False): @@ -1076,6 +1091,15 @@ def close(self, return_metadata=False): for cw, _, meta_path in self._chunked_writers ] + if self.fs_meta.get("is_s3", False): + local_path = self.path + s3_path = self.fs_meta["actual_path"] + s3_file, _ = ioutils._get_filesystem_and_paths( + s3_path, **self.kwargs + ) + s3_file.put(local_path, s3_path, recursive=True) + shutil.rmtree(self.path) + if return_metadata: return ( merge_parquet_filemetadata(metadata) diff --git a/python/cudf/cudf/testing/testing.py b/python/cudf/cudf/testing/testing.py index b134d2b26e9..070e4649c7b 100644 --- a/python/cudf/cudf/testing/testing.py +++ b/python/cudf/cudf/testing/testing.py @@ -20,6 +20,7 @@ is_struct_dtype, ) from cudf.core._compat import PANDAS_GE_110 +from cudf.core.missing import NA def dtype_can_compare_equal_to_other(dtype): @@ -290,7 +291,7 @@ def assert_column_equal( def null_safe_scalar_equals(left, right): - if left in {cudf.NA, np.nan} or right in {cudf.NA, np.nan}: + if left in {NA, np.nan} or right in {NA, np.nan}: return left is right return left == right diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index e8d93caaf55..b754429555d 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -1,9 +1,7 @@ # Copyright (c) 2020-2022, NVIDIA CORPORATION. import os -import shlex -import subprocess -import time +import socket from contextlib import contextmanager from io import BytesIO @@ -18,11 +16,27 @@ import cudf from cudf.testing._utils import assert_eq -moto = pytest.importorskip("moto", minversion="1.3.14") +moto = pytest.importorskip("moto", minversion="3.1.6") boto3 = pytest.importorskip("boto3") -requests = pytest.importorskip("requests") s3fs = pytest.importorskip("s3fs") +ThreadedMotoServer = pytest.importorskip("moto.server").ThreadedMotoServer + + +@pytest.fixture(scope="session") +def endpoint_ip(): + return "127.0.0.1" + + +@pytest.fixture(scope="session") +def endpoint_port(): + # Return a free port per worker session. + sock = socket.socket() + sock.bind(("127.0.0.1", 0)) + port = sock.getsockname()[1] + sock.close() + return port + @contextmanager def ensure_safe_environment_variables(): @@ -40,7 +54,7 @@ def ensure_safe_environment_variables(): @pytest.fixture(scope="session") -def s3_base(worker_id): +def s3_base(endpoint_ip, endpoint_port): """ Fixture to set up moto server in separate process """ @@ -49,47 +63,25 @@ def s3_base(worker_id): # system aws credentials, https://github.com/spulec/moto/issues/1793 os.environ.setdefault("AWS_ACCESS_KEY_ID", "foobar_key") os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "foobar_secret") + os.environ.setdefault("S3FS_LOGGING_LEVEL", "DEBUG") # Launching moto in server mode, i.e., as a separate process # with an S3 endpoint on localhost - endpoint_port = ( - 5000 - if worker_id == "master" - else 5550 + int(worker_id.lstrip("gw")) - ) - endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" - - proc = subprocess.Popen( - shlex.split(f"moto_server s3 -p {endpoint_port}"), - ) + endpoint_uri = f"http://{endpoint_ip}:{endpoint_port}/" - timeout = 5 - while timeout > 0: - try: - # OK to go once server is accepting connections - r = requests.get(endpoint_uri) - if r.ok: - break - except Exception: - pass - timeout -= 0.1 - time.sleep(0.1) + server = ThreadedMotoServer(ip_address=endpoint_ip, port=endpoint_port) + server.start() yield endpoint_uri - - proc.terminate() - proc.wait() + server.stop() @pytest.fixture() -def s3so(worker_id): +def s3so(endpoint_ip, endpoint_port): """ Returns s3 storage options to pass to fsspec """ - endpoint_port = ( - 5000 if worker_id == "master" else 5550 + int(worker_id.lstrip("gw")) - ) - endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" + endpoint_uri = f"http://{endpoint_ip}:{endpoint_port}/" return {"client_kwargs": {"endpoint_url": endpoint_uri}} @@ -141,13 +133,13 @@ def pdf_ext(scope="module"): def test_read_csv(s3_base, s3so, pdf, bytes_per_thread): # Write to buffer fname = "test_csv_reader.csv" - bname = "csv" + bucket = "csv" buffer = pdf.to_csv(index=False) # Use fsspec file object - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_csv( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, bytes_per_thread=bytes_per_thread, use_python_file_object=False, @@ -155,9 +147,9 @@ def test_read_csv(s3_base, s3so, pdf, bytes_per_thread): assert_eq(pdf, got) # Use Arrow PythonFile object - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_csv( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, bytes_per_thread=bytes_per_thread, use_python_file_object=True, @@ -168,13 +160,13 @@ def test_read_csv(s3_base, s3so, pdf, bytes_per_thread): def test_read_csv_arrow_nativefile(s3_base, s3so, pdf): # Write to buffer fname = "test_csv_reader_arrow_nativefile.csv" - bname = "csv" + bucket = "csv" buffer = pdf.to_csv(index=False) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): fs = pa_fs.S3FileSystem( endpoint_override=s3so["client_kwargs"]["endpoint_url"], ) - with fs.open_input_file(f"{bname}/{fname}") as fil: + with fs.open_input_file(f"{bucket}/{fname}") as fil: got = cudf.read_csv(fil) assert_eq(pdf, got) @@ -187,13 +179,13 @@ def test_read_csv_byte_range( ): # Write to buffer fname = "test_csv_reader_byte_range.csv" - bname = "csv" + bucket = "csv" buffer = pdf.to_csv(index=False) # Use fsspec file object - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_csv( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, byte_range=(74, 73), bytes_per_thread=bytes_per_thread, @@ -209,19 +201,19 @@ def test_read_csv_byte_range( def test_write_csv(s3_base, s3so, pdf, chunksize): # Write to buffer fname = "test_csv_writer.csv" - bname = "csv" + bucket = "csv" gdf = cudf.from_pandas(pdf) - with s3_context(s3_base=s3_base, bucket=bname) as s3fs: + with s3_context(s3_base=s3_base, bucket=bucket) as s3fs: gdf.to_csv( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", index=False, chunksize=chunksize, storage_options=s3so, ) - assert s3fs.exists(f"s3://{bname}/{fname}") + assert s3fs.exists(f"s3://{bucket}/{fname}") # TODO: Update to use `storage_options` from pandas v1.2.0 - got = pd.read_csv(s3fs.open(f"s3://{bname}/{fname}")) + got = pd.read_csv(s3fs.open(f"s3://{bucket}/{fname}")) assert_eq(pdf, got) @@ -240,15 +232,15 @@ def test_read_parquet( use_python_file_object, ): fname = "test_parquet_reader.parquet" - bname = "parquet" + bucket = "parquet" buffer = BytesIO() pdf.to_parquet(path=buffer) # Check direct path handling buffer.seek(0) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got1 = cudf.read_parquet( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", open_file_options=( {"precache_options": {"method": precache}} if use_python_file_object @@ -264,11 +256,11 @@ def test_read_parquet( # Check fsspec file-object handling buffer.seek(0) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): - fs = get_fs_token_paths(f"s3://{bname}/{fname}", storage_options=s3so)[ - 0 - ] - with fs.open(f"s3://{bname}/{fname}", mode="rb") as f: + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): + fs = get_fs_token_paths( + f"s3://{bucket}/{fname}", storage_options=s3so + )[0] + with fs.open(f"s3://{bucket}/{fname}", mode="rb") as f: got2 = cudf.read_parquet( f, bytes_per_thread=bytes_per_thread, @@ -290,7 +282,7 @@ def test_read_parquet_ext( index, ): fname = "test_parquet_reader_ext.parquet" - bname = "parquet" + bucket = "parquet" buffer = BytesIO() if index: @@ -300,9 +292,9 @@ def test_read_parquet_ext( # Check direct path handling buffer.seek(0) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got1 = cudf.read_parquet( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, bytes_per_thread=bytes_per_thread, footer_sample_size=3200, @@ -323,15 +315,15 @@ def test_read_parquet_ext( def test_read_parquet_arrow_nativefile(s3_base, s3so, pdf, columns): # Write to buffer fname = "test_parquet_reader_arrow_nativefile.parquet" - bname = "parquet" + bucket = "parquet" buffer = BytesIO() pdf.to_parquet(path=buffer) buffer.seek(0) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): fs = pa_fs.S3FileSystem( endpoint_override=s3so["client_kwargs"]["endpoint_url"], ) - with fs.open_input_file(f"{bname}/{fname}") as fil: + with fs.open_input_file(f"{bucket}/{fname}") as fil: got = cudf.read_parquet(fil, columns=columns) expect = pdf[columns] if columns else pdf @@ -341,14 +333,14 @@ def test_read_parquet_arrow_nativefile(s3_base, s3so, pdf, columns): @pytest.mark.parametrize("precache", [None, "parquet"]) def test_read_parquet_filters(s3_base, s3so, pdf_ext, precache): fname = "test_parquet_reader_filters.parquet" - bname = "parquet" + bucket = "parquet" buffer = BytesIO() pdf_ext.to_parquet(path=buffer) buffer.seek(0) filters = [("String", "==", "Omega")] - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_parquet( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, filters=filters, open_file_options={"precache_options": {"method": precache}}, @@ -360,25 +352,38 @@ def test_read_parquet_filters(s3_base, s3so, pdf_ext, precache): @pytest.mark.parametrize("partition_cols", [None, ["String"]]) def test_write_parquet(s3_base, s3so, pdf, partition_cols): - fname = "test_parquet_writer.parquet" - bname = "parquet" + fname_cudf = "test_parquet_writer_cudf" + fname_pandas = "test_parquet_writer_pandas" + bucket = "parquet" gdf = cudf.from_pandas(pdf) - with s3_context(s3_base=s3_base, bucket=bname) as s3fs: + + with s3_context(s3_base=s3_base, bucket=bucket) as s3fs: gdf.to_parquet( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname_cudf}", partition_cols=partition_cols, storage_options=s3so, ) - assert s3fs.exists(f"s3://{bname}/{fname}") + assert s3fs.exists(f"s3://{bucket}/{fname_cudf}") + pdf.to_parquet( + f"s3://{bucket}/{fname_pandas}", + partition_cols=partition_cols, + storage_options=s3so, + ) + assert s3fs.exists(f"s3://{bucket}/{fname_pandas}") - got = pd.read_parquet(s3fs.open(f"s3://{bname}/{fname}")) + got = pd.read_parquet( + f"s3://{bucket}/{fname_pandas}", storage_options=s3so + ) + expect = cudf.read_parquet( + f"s3://{bucket}/{fname_cudf}", storage_options=s3so + ) - assert_eq(pdf, got) + assert_eq(expect, got) def test_read_json(s3_base, s3so): fname = "test_json_reader.json" - bname = "json" + bucket = "json" # TODO: After following bug is fixed switch # back to using bytes: # https://github.com/pandas-dev/pandas/issues/46935 @@ -396,9 +401,9 @@ def test_read_json(s3_base, s3so): '{"amount": 400, "name": "Dennis"}\n' ) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_json( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", engine="cudf", orient="records", lines=True, @@ -414,15 +419,15 @@ def test_read_json(s3_base, s3so): def test_read_orc(s3_base, s3so, datadir, use_python_file_object, columns): source_file = str(datadir / "orc" / "TestOrcFile.testSnappy.orc") fname = "test_orc_reader.orc" - bname = "orc" + bucket = "orc" expect = pa.orc.ORCFile(source_file).read().to_pandas() with open(source_file, "rb") as f: buffer = f.read() - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_orc( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", columns=columns, storage_options=s3so, use_python_file_object=use_python_file_object, @@ -437,17 +442,17 @@ def test_read_orc(s3_base, s3so, datadir, use_python_file_object, columns): def test_read_orc_arrow_nativefile(s3_base, s3so, datadir, columns): source_file = str(datadir / "orc" / "TestOrcFile.testSnappy.orc") fname = "test_orc_reader.orc" - bname = "orc" + bucket = "orc" expect = pa.orc.ORCFile(source_file).read().to_pandas() with open(source_file, "rb") as f: buffer = f.read() - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): fs = pa_fs.S3FileSystem( endpoint_override=s3so["client_kwargs"]["endpoint_url"], ) - with fs.open_input_file(f"{bname}/{fname}") as fil: + with fs.open_input_file(f"{bucket}/{fname}") as fil: got = cudf.read_orc(fil, columns=columns) if columns: @@ -457,13 +462,51 @@ def test_read_orc_arrow_nativefile(s3_base, s3so, datadir, columns): def test_write_orc(s3_base, s3so, pdf): fname = "test_orc_writer.orc" - bname = "orc" + bucket = "orc" gdf = cudf.from_pandas(pdf) - with s3_context(s3_base=s3_base, bucket=bname) as s3fs: - gdf.to_orc(f"s3://{bname}/{fname}", storage_options=s3so) - assert s3fs.exists(f"s3://{bname}/{fname}") + with s3_context(s3_base=s3_base, bucket=bucket) as s3fs: + gdf.to_orc(f"s3://{bucket}/{fname}", storage_options=s3so) + assert s3fs.exists(f"s3://{bucket}/{fname}") - with s3fs.open(f"s3://{bname}/{fname}") as f: + with s3fs.open(f"s3://{bucket}/{fname}") as f: got = pa.orc.ORCFile(f).read().to_pandas() assert_eq(pdf, got) + + +def test_write_chunked_parquet(s3_base, s3so): + df1 = cudf.DataFrame({"b": [10, 11, 12], "a": [1, 2, 3]}) + df2 = cudf.DataFrame({"b": [20, 30, 50], "a": [3, 2, 1]}) + dirname = "chunked_writer_directory" + bucket = "parquet" + from cudf.io.parquet import ParquetDatasetWriter + + with s3_context( + s3_base=s3_base, bucket=bucket, files={dirname: BytesIO()} + ) as s3fs: + with ParquetDatasetWriter( + f"s3://{bucket}/{dirname}", + partition_cols=["a"], + storage_options=s3so, + ) as cw: + cw.write_table(df1) + cw.write_table(df2) + + # TODO: Replace following workaround with: + # expect = cudf.read_parquet(f"s3://{bucket}/{dirname}/", + # storage_options=s3so) + # after the following bug is fixed: + # https://issues.apache.org/jira/browse/ARROW-16438 + + dfs = [] + for folder in {"a=1", "a=2", "a=3"}: + assert s3fs.exists(f"s3://{bucket}/{dirname}/{folder}") + for file in s3fs.ls(f"s3://{bucket}/{dirname}/{folder}"): + df = cudf.read_parquet("s3://" + file, storage_options=s3so) + dfs.append(df) + + actual = cudf.concat(dfs).astype("int64") + assert_eq( + actual.sort_values(["b"]).reset_index(drop=True), + cudf.concat([df1, df2]).sort_values(["b"]).reset_index(drop=True), + ) diff --git a/python/cudf/cudf/utils/dtypes.py b/python/cudf/cudf/utils/dtypes.py index 35c6fdc73f8..c2d9a57b72f 100644 --- a/python/cudf/cudf/utils/dtypes.py +++ b/python/cudf/cudf/utils/dtypes.py @@ -12,6 +12,7 @@ import cudf from cudf.core._compat import PANDAS_GE_120 +from cudf.core.missing import NA _NA_REP = "" @@ -591,7 +592,7 @@ def _can_cast(from_dtype, to_dtype): `np.can_cast` but with some special handling around cudf specific dtypes. """ - if from_dtype in {None, cudf.NA}: + if from_dtype in {None, NA}: return True if isinstance(from_dtype, type): from_dtype = cudf.dtype(from_dtype) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index 684b1f71099..d137fac5fe3 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -35,6 +35,25 @@ ) +def _check_groupby_supported(func): + """ + Decorator for dask-cudf's groupby methods that returns the dask-cudf + method if the groupby object is supported, otherwise reverting to the + upstream Dask method + """ + + def wrapper(*args, **kwargs): + gb = args[0] + if _groupby_supported(gb): + return func(*args, **kwargs) + # note that we use upstream Dask's default kwargs for this call if + # none are specified; this shouldn't be an issue as those defaults are + # consistent with dask-cudf + return getattr(super(type(gb), gb), func.__name__)(*args[1:], **kwargs) + + return wrapper + + class CudfDataFrameGroupBy(DataFrameGroupBy): @_dask_cudf_nvtx_annotate def __init__(self, *args, **kwargs): @@ -65,6 +84,22 @@ def __getitem__(self, key): return g @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def count(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "count" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported def mean(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -72,13 +107,89 @@ def mean(self, split_every=None, split_out=1): {c: "mean" for c in self.obj.columns if c not in self.by}, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def std(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "std" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def var(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "var" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def sum(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "sum" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def min(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "min" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, ) @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def max(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "max" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported def collect(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -86,10 +197,40 @@ def collect(self, split_every=None, split_out=1): {c: "collect" for c in self.obj.columns if c not in self.by}, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def first(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "first" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def last(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "last" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, ) @_dask_cudf_nvtx_annotate @@ -98,17 +239,7 @@ def aggregate(self, arg, split_every=None, split_out=1): return self.size() arg = _redirect_aggs(arg) - if ( - isinstance(self.obj, DaskDataFrame) - and ( - isinstance(self.by, str) - or ( - isinstance(self.by, list) - and all(isinstance(x, str) for x in self.by) - ) - ) - and _is_supported(arg, SUPPORTED_AGGS) - ): + if _groupby_supported(self) and _aggs_supported(arg, SUPPORTED_AGGS): if isinstance(self._meta.grouping.keys, cudf.MultiIndex): keys = self._meta.grouping.keys.names else: @@ -120,10 +251,10 @@ def aggregate(self, arg, split_every=None, split_out=1): arg, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, ) return super().aggregate( @@ -139,6 +270,22 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def count(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "count"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported def mean(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -146,13 +293,14 @@ def mean(self, split_every=None, split_out=1): {self._slice: "mean"}, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, )[self._slice] @_dask_cudf_nvtx_annotate + @_check_groupby_supported def std(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -160,13 +308,14 @@ def std(self, split_every=None, split_out=1): {self._slice: "std"}, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, )[self._slice] @_dask_cudf_nvtx_annotate + @_check_groupby_supported def var(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -174,13 +323,59 @@ def var(self, split_every=None, split_out=1): {self._slice: "var"}, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, )[self._slice] @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def sum(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "sum"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def min(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "min"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def max(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "max"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported def collect(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -188,10 +383,40 @@ def collect(self, split_every=None, split_out=1): {self._slice: "collect"}, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def first(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "first"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def last(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "last"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, )[self._slice] @_dask_cudf_nvtx_annotate @@ -203,21 +428,17 @@ def aggregate(self, arg, split_every=None, split_out=1): if not isinstance(arg, dict): arg = {self._slice: arg} - if ( - isinstance(self.obj, DaskDataFrame) - and isinstance(self.by, (str, list)) - and _is_supported(arg, SUPPORTED_AGGS) - ): + if _groupby_supported(self) and _aggs_supported(arg, SUPPORTED_AGGS): return groupby_agg( self.obj, self.by, arg, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, )[self._slice] return super().aggregate( @@ -258,7 +479,7 @@ def groupby_agg( """ # Assert that aggregations are supported aggs = _redirect_aggs(aggs_in) - if not _is_supported(aggs, SUPPORTED_AGGS): + if not _aggs_supported(aggs, SUPPORTED_AGGS): raise ValueError( f"Supported aggs include {SUPPORTED_AGGS} for groupby_agg API. " f"Aggregations must be specified with dict or list syntax." @@ -420,7 +641,7 @@ def _redirect_aggs(arg): @_dask_cudf_nvtx_annotate -def _is_supported(arg, supported: set): +def _aggs_supported(arg, supported: set): """Check that aggregations in `arg` are a subset of `supported`""" if isinstance(arg, (list, dict)): if isinstance(arg, dict): @@ -439,6 +660,14 @@ def _is_supported(arg, supported: set): return False +def _groupby_supported(gb): + """Check that groupby input is supported by dask-cudf""" + return isinstance(gb.obj, DaskDataFrame) and ( + isinstance(gb.by, str) + or (isinstance(gb.by, list) and all(isinstance(x, str) for x in gb.by)) + ) + + def _make_name(*args, sep="_"): """Combine elements of `args` into a new string""" _args = (arg for arg in args if arg != "") diff --git a/python/dask_cudf/dask_cudf/io/tests/test_s3.py b/python/dask_cudf/dask_cudf/io/tests/test_s3.py index 83ff1273b36..5be0cf7c887 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_s3.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_s3.py @@ -1,7 +1,7 @@ +# Copyright (c) 2020-2022, NVIDIA CORPORATION. + import os -import shlex -import subprocess -import time +import socket from contextlib import contextmanager from io import BytesIO @@ -11,10 +11,25 @@ import dask_cudf -moto = pytest.importorskip("moto", minversion="1.3.14") +moto = pytest.importorskip("moto", minversion="3.1.6") boto3 = pytest.importorskip("boto3") -requests = pytest.importorskip("requests") s3fs = pytest.importorskip("s3fs") +ThreadedMotoServer = pytest.importorskip("moto.server").ThreadedMotoServer + + +@pytest.fixture(scope="session") +def endpoint_ip(): + return "127.0.0.1" + + +@pytest.fixture(scope="session") +def endpoint_port(): + # Return a free port per worker session. + sock = socket.socket() + sock.bind(("127.0.0.1", 0)) + port = sock.getsockname()[1] + sock.close() + return port @contextmanager @@ -33,7 +48,7 @@ def ensure_safe_environment_variables(): @pytest.fixture(scope="session") -def s3_base(worker_id): +def s3_base(endpoint_ip, endpoint_port): """ Fixture to set up moto server in separate process """ @@ -42,47 +57,25 @@ def s3_base(worker_id): # system aws credentials, https://github.com/spulec/moto/issues/1793 os.environ.setdefault("AWS_ACCESS_KEY_ID", "foobar_key") os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "foobar_secret") + os.environ.setdefault("S3FS_LOGGING_LEVEL", "DEBUG") # Launching moto in server mode, i.e., as a separate process # with an S3 endpoint on localhost - endpoint_port = ( - 5000 - if worker_id == "master" - else 5550 + int(worker_id.lstrip("gw")) - ) - endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" - - proc = subprocess.Popen( - shlex.split(f"moto_server s3 -p {endpoint_port}"), - ) + endpoint_uri = f"http://{endpoint_ip}:{endpoint_port}/" - timeout = 5 - while timeout > 0: - try: - # OK to go once server is accepting connections - r = requests.get(endpoint_uri) - if r.ok: - break - except Exception: - pass - timeout -= 0.1 - time.sleep(0.1) + server = ThreadedMotoServer(ip_address=endpoint_ip, port=endpoint_port) + server.start() yield endpoint_uri - - proc.terminate() - proc.wait() + server.stop() @pytest.fixture() -def s3so(worker_id): +def s3so(endpoint_ip, endpoint_port): """ Returns s3 storage options to pass to fsspec """ - endpoint_port = ( - 5000 if worker_id == "master" else 5550 + int(worker_id.lstrip("gw")) - ) - endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" + endpoint_uri = f"http://{endpoint_ip}:{endpoint_port}/" return {"client_kwargs": {"endpoint_url": endpoint_uri}} diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index e3545149c24..d2c9ecd0293 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -11,7 +11,7 @@ from cudf.core._compat import PANDAS_GE_120 import dask_cudf -from dask_cudf.groupby import SUPPORTED_AGGS, _is_supported +from dask_cudf.groupby import SUPPORTED_AGGS, _aggs_supported @pytest.mark.parametrize("aggregation", SUPPORTED_AGGS) @@ -235,8 +235,7 @@ def test_groupby_split_out(split_out, column): @pytest.mark.parametrize( "by", ["a", "b", "c", "d", ["a", "b"], ["a", "c"], ["a", "d"]] ) -def test_groupby_dropna(dropna, by): - +def test_groupby_dropna_cudf(dropna, by): # NOTE: This test is borrowed from upstream dask # (dask/dask/dataframe/tests/test_groupby.py) df = cudf.DataFrame( @@ -265,6 +264,100 @@ def test_groupby_dropna(dropna, by): dd.assert_eq(dask_result, cudf_result) +@pytest.mark.parametrize( + "dropna,by", + [ + (False, "a"), + (False, "b"), + (False, "c"), + pytest.param( + False, + "d", + marks=pytest.mark.xfail( + reason="dropna=False is broken in Dask CPU for groupbys on " + "categorical columns" + ), + ), + pytest.param( + False, + ["a", "b"], + marks=pytest.mark.xfail( + reason="https://github.com/dask/dask/issues/8817" + ), + ), + pytest.param( + False, + ["a", "c"], + marks=pytest.mark.xfail( + reason="https://github.com/dask/dask/issues/8817" + ), + ), + pytest.param( + False, + ["a", "d"], + marks=pytest.mark.xfail( + reason="multi-col groupbys on categorical columns are broken " + "in Dask CPU" + ), + ), + (True, "a"), + (True, "b"), + (True, "c"), + (True, "d"), + (True, ["a", "b"]), + (True, ["a", "c"]), + pytest.param( + True, + ["a", "d"], + marks=pytest.mark.xfail( + reason="multi-col groupbys on categorical columns are broken " + "in Dask CPU" + ), + ), + (None, "a"), + (None, "b"), + (None, "c"), + (None, "d"), + (None, ["a", "b"]), + (None, ["a", "c"]), + pytest.param( + None, + ["a", "d"], + marks=pytest.mark.xfail( + reason="multi-col groupbys on categorical columns are broken " + "in Dask CPU" + ), + ), + ], +) +def test_groupby_dropna_dask(dropna, by): + # NOTE: This test is borrowed from upstream dask + # (dask/dask/dataframe/tests/test_groupby.py) + df = pd.DataFrame( + { + "a": [1, 2, 3, 4, None, None, 7, 8], + "b": [1, None, 1, 3, None, 3, 1, 3], + "c": ["a", "b", None, None, "e", "f", "g", "h"], + "e": [4, 5, 6, 3, 2, 1, 0, 0], + } + ) + df["b"] = df["b"].astype("datetime64[ns]") + df["d"] = df["c"].astype("category") + + gdf = cudf.from_pandas(df) + ddf = dd.from_pandas(df, npartitions=3) + gddf = dask_cudf.from_cudf(gdf, npartitions=3) + + if dropna is None: + dask_cudf_result = gddf.groupby(by).e.sum() + dask_result = ddf.groupby(by).e.sum() + else: + dask_cudf_result = gddf.groupby(by, dropna=dropna).e.sum() + dask_result = ddf.groupby(by, dropna=dropna).e.sum() + + dd.assert_eq(dask_cudf_result, dask_result) + + @pytest.mark.parametrize("myindex", [[1, 2] * 4, ["s1", "s2"] * 4]) def test_groupby_string_index_name(myindex): # GH-Issue #3420 @@ -570,7 +663,7 @@ def test_groupby_agg_redirect(aggregations): [["not_supported"], {"a": "not_supported"}, {"a": ["not_supported"]}], ) def test_is_supported(arg): - assert _is_supported(arg, {"supported"}) is False + assert _aggs_supported(arg, {"supported"}) is False def test_groupby_unique_lists():