From f0efc8b36a8f43cfa027966265dcea052bb5c45d Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Tue, 23 Jul 2024 17:17:05 -0700 Subject: [PATCH] Modify `make_host_vector` and `make_device_uvector` factories to optionally use pinned memory and kernel copy (#16206) Issue #15616 Modified `make_host_vector` functions to return `cudf::detail::host_vector`, which can use a pinned or a pageable memory resource. When pinned memory is used, the D2H copy is potentially done using a CUDA kernel. Also added factories to create `host_vector`s without device data. These are useful to replace uses of `std::vector` and `thrust::host_vector` when the data eventually gets copied to the GPU. Added `is_device_accessible` to `host_span`. With this, `make_device_uvector` can optionally use the kernel for the H2D copy. Modified `cudf::detail::host_vector` to be derived from `thrust::host_vector`, to avoid issues with implicit conversion from `std::vector`. Used `cudf::detail::host_vector` and its new factory functions wherever data ends up copied to the GPU. Stopped using `thrust::copy_n` for the kernel copy path in `cuda_memcpy` because of an optimization that allows it to fall back to `cudaMemCpyAsync`. We now call a simple local kernel. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Robert Maynard (https://github.com/robertmaynard) - Yunsong Wang (https://github.com/PointKernel) - Nghia Truong (https://github.com/ttnghia) - Alessandro Bellina (https://github.com/abellina) URL: https://github.com/rapidsai/cudf/pull/16206 --- cpp/CMakeLists.txt | 2 +- cpp/include/cudf/detail/gather.cuh | 2 +- cpp/include/cudf/detail/null_mask.cuh | 4 +- .../cudf/detail/utilities/host_memory.hpp | 51 +++++++++ .../cudf/detail/utilities/host_vector.hpp | 24 +++- .../detail/utilities/vector_factories.hpp | 106 ++++++++++++------ cpp/include/cudf/io/text/detail/trie.hpp | 4 +- cpp/include/cudf/lists/detail/dremel.hpp | 10 +- cpp/include/cudf/utilities/pinned_memory.hpp | 16 +++ cpp/include/cudf/utilities/span.hpp | 32 ++++++ cpp/src/copying/concatenate.cu | 6 +- cpp/src/copying/contiguous_split.cu | 3 +- cpp/src/datetime/timezone.cpp | 6 +- cpp/src/dictionary/detail/concatenate.cu | 2 +- cpp/src/io/avro/reader_impl.cu | 8 +- cpp/src/io/csv/reader_impl.cu | 44 +++++--- cpp/src/io/json/json_column.cu | 4 +- cpp/src/io/json/nested_json_gpu.cu | 6 +- cpp/src/io/json/read_json.cu | 3 +- cpp/src/io/orc/reader_impl_decode.cu | 10 +- cpp/src/io/orc/stripe_enc.cu | 4 +- cpp/src/io/orc/writer_impl.cu | 50 +++++---- cpp/src/io/orc/writer_impl.hpp | 9 +- cpp/src/io/parquet/predicate_pushdown.cpp | 20 ++-- cpp/src/io/parquet/reader_impl_chunking.cu | 78 +++++++------ cpp/src/io/parquet/reader_impl_preprocess.cu | 10 +- cpp/src/io/parquet/writer_impl.cu | 7 +- cpp/src/lists/dremel.cu | 6 +- cpp/src/strings/combine/join.cu | 6 +- cpp/src/strings/convert/convert_datetime.cu | 2 +- cpp/src/strings/copying/concatenate.cu | 2 +- cpp/src/strings/filter_chars.cu | 2 +- cpp/src/strings/replace/multi_re.cu | 2 +- cpp/src/strings/translate.cu | 2 +- cpp/src/table/row_operators.cu | 5 +- cpp/src/utilities/cuda_memcpy.cu | 20 +++- .../{pinned_memory.cpp => host_memory.cpp} | 86 +++++++++++++- cpp/tests/io/json/json_tree.cpp | 6 +- cpp/tests/strings/integers_tests.cpp | 4 +- .../utilities_tests/pinned_memory_tests.cpp | 67 ++++++++++- 40 files changed, 539 insertions(+), 192 deletions(-) create mode 100644 cpp/include/cudf/detail/utilities/host_memory.hpp rename cpp/src/utilities/{pinned_memory.cpp => host_memory.cpp} (73%) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 5e79204a558..a2c2dd3af4c 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -671,9 +671,9 @@ add_library( src/unary/null_ops.cu src/utilities/cuda_memcpy.cu src/utilities/default_stream.cpp + src/utilities/host_memory.cpp src/utilities/linked_column.cpp src/utilities/logger.cpp - src/utilities/pinned_memory.cpp src/utilities/prefetch.cpp src/utilities/stacktrace.cpp src/utilities/stream_pool.cpp diff --git a/cpp/include/cudf/detail/gather.cuh b/cpp/include/cudf/detail/gather.cuh index 5977c7341c1..d3e9fc4974d 100644 --- a/cpp/include/cudf/detail/gather.cuh +++ b/cpp/include/cudf/detail/gather.cuh @@ -577,7 +577,7 @@ void gather_bitmask(table_view const& source, } // Make device array of target bitmask pointers - std::vector target_masks(target.size()); + auto target_masks = make_host_vector(target.size(), stream); std::transform(target.begin(), target.end(), target_masks.begin(), [](auto const& col) { return col->mutable_view().null_mask(); }); diff --git a/cpp/include/cudf/detail/null_mask.cuh b/cpp/include/cudf/detail/null_mask.cuh index e62675cbc8c..ae6db5409cc 100644 --- a/cpp/include/cudf/detail/null_mask.cuh +++ b/cpp/include/cudf/detail/null_mask.cuh @@ -430,7 +430,9 @@ std::vector segmented_count_bits(bitmask_type const* bitmask, if (num_segments == 0) { return std::vector{}; } // Construct a contiguous host buffer of indices and copy to device. - auto const h_indices = std::vector(indices_begin, indices_end); + auto h_indices = make_empty_host_vector::value_type>( + std::distance(indices_begin, indices_end), stream); + std::copy(indices_begin, indices_end, std::back_inserter(h_indices)); auto const d_indices = make_device_uvector_async(h_indices, stream, rmm::mr::get_current_device_resource()); diff --git a/cpp/include/cudf/detail/utilities/host_memory.hpp b/cpp/include/cudf/detail/utilities/host_memory.hpp new file mode 100644 index 00000000000..c6775a950c9 --- /dev/null +++ b/cpp/include/cudf/detail/utilities/host_memory.hpp @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include + +#include + +namespace cudf::detail { +/** + * @brief Get the memory resource to be used for pageable memory allocations. + * + * @return Reference to the pageable memory resource + */ +CUDF_EXPORT rmm::host_async_resource_ref get_pageable_memory_resource(); + +/** + * @brief Get the allocator to be used for the host memory allocation. + * + * @param size The number of elements of type T to allocate + * @param stream The stream to use for the allocation + * @return The allocator to be used for the host memory allocation + */ +template +rmm_host_allocator get_host_allocator(std::size_t size, rmm::cuda_stream_view stream) +{ + if (size * sizeof(T) <= get_allocate_host_as_pinned_threshold()) { + return {get_pinned_memory_resource(), stream}; + } + return {get_pageable_memory_resource(), stream}; +} + +} // namespace cudf::detail diff --git a/cpp/include/cudf/detail/utilities/host_vector.hpp b/cpp/include/cudf/detail/utilities/host_vector.hpp index 2d14d0306cd..f4e5f718da4 100644 --- a/cpp/include/cudf/detail/utilities/host_vector.hpp +++ b/cpp/include/cudf/detail/utilities/host_vector.hpp @@ -61,6 +61,10 @@ class rmm_host_allocator { }; }; +template +inline constexpr bool contains_property = + (cuda::std::is_same_v || ... || false); + /*! \p rmm_host_allocator is a CUDA-specific host memory allocator * that employs \c `rmm::host_async_resource_ref` for allocation. * @@ -100,8 +104,12 @@ class rmm_host_allocator { /** * @brief Construct from a `cudf::host_async_resource_ref` */ - rmm_host_allocator(rmm::host_async_resource_ref _mr, rmm::cuda_stream_view _stream) - : mr(_mr), stream(_stream) + template + rmm_host_allocator(cuda::mr::async_resource_ref _mr, + rmm::cuda_stream_view _stream) + : mr(_mr), + stream(_stream), + _is_device_accessible{contains_property} { } @@ -173,15 +181,25 @@ class rmm_host_allocator { */ inline bool operator!=(rmm_host_allocator const& x) const { return !operator==(x); } + bool is_device_accessible() const { return _is_device_accessible; } + private: rmm::host_async_resource_ref mr; rmm::cuda_stream_view stream; + bool _is_device_accessible; }; /** * @brief A vector class with rmm host memory allocator */ template -using host_vector = thrust::host_vector>; +class host_vector : public thrust::host_vector> { + public: + using base = thrust::host_vector>; + + host_vector(rmm_host_allocator const& alloc) : base(alloc) {} + + host_vector(size_t size, rmm_host_allocator const& alloc) : base(size, alloc) {} +}; } // namespace cudf::detail diff --git a/cpp/include/cudf/detail/utilities/vector_factories.hpp b/cpp/include/cudf/detail/utilities/vector_factories.hpp index 20cb55bb1c7..45dc839c9bd 100644 --- a/cpp/include/cudf/detail/utilities/vector_factories.hpp +++ b/cpp/include/cudf/detail/utilities/vector_factories.hpp @@ -21,6 +21,8 @@ * @file vector_factories.hpp */ +#include +#include #include #include #include @@ -32,8 +34,6 @@ #include #include -#include - #include namespace cudf { @@ -100,11 +100,12 @@ rmm::device_uvector make_device_uvector_async(host_span source_data, rmm::device_async_resource_ref mr) { rmm::device_uvector ret(source_data.size(), stream, mr); - CUDF_CUDA_TRY(cudaMemcpyAsync(ret.data(), - source_data.data(), - source_data.size() * sizeof(T), - cudaMemcpyDefault, - stream.value())); + auto const is_pinned = source_data.is_device_accessible(); + cuda_memcpy_async(ret.data(), + source_data.data(), + source_data.size() * sizeof(T), + is_pinned ? host_memory_kind::PINNED : host_memory_kind::PAGEABLE, + stream); return ret; } @@ -271,21 +272,11 @@ rmm::device_uvector make_device_uvector_sync( return make_device_uvector_sync(device_span{c}, stream, mr); } -// Utility function template to allow copying to either a thrust::host_vector or std::vector -template -OutContainer make_vector_async(device_span v, rmm::cuda_stream_view stream) -{ - OutContainer result(v.size()); - CUDF_CUDA_TRY(cudaMemcpyAsync( - result.data(), v.data(), v.size() * sizeof(T), cudaMemcpyDefault, stream.value())); - return result; -} - /** * @brief Asynchronously construct a `std::vector` containing a copy of data from a * `device_span` * - * @note This function does not synchronize `stream`. + * @note This function does not synchronize `stream` after the copy. * * @tparam T The type of the data to copy * @param source_data The device data to copy @@ -295,14 +286,17 @@ OutContainer make_vector_async(device_span v, rmm::cuda_stream_view str template std::vector make_std_vector_async(device_span v, rmm::cuda_stream_view stream) { - return make_vector_async>(v, stream); + std::vector result(v.size()); + CUDF_CUDA_TRY(cudaMemcpyAsync( + result.data(), v.data(), v.size() * sizeof(T), cudaMemcpyDefault, stream.value())); + return result; } /** * @brief Asynchronously construct a `std::vector` containing a copy of data from a device * container * - * @note This function synchronizes `stream`. + * @note This function synchronizes `stream` after the copy. * * @tparam Container The type of the container to copy from * @tparam T The type of the data to copy @@ -324,7 +318,7 @@ std::vector make_std_vector_async(Container cons * @brief Synchronously construct a `std::vector` containing a copy of data from a * `device_span` * - * @note This function does a synchronize on `stream`. + * @note This function does a synchronize on `stream` after the copy. * * @tparam T The type of the data to copy * @param source_data The device data to copy @@ -361,11 +355,46 @@ std::vector make_std_vector_sync(Container const return make_std_vector_sync(device_span{c}, stream); } +/** + * @brief Construct a `cudf::detail::host_vector` of the given size. + * + * @note The returned vector may be using a pinned memory resource. + * + * @tparam T The type of the vector data + * @param size The number of elements in the created vector + * @param stream The stream on which to allocate memory + * @return A host_vector of the given size + */ +template +host_vector make_host_vector(size_t size, rmm::cuda_stream_view stream) +{ + return host_vector(size, get_host_allocator(size, stream)); +} + +/** + * @brief Construct an empty `cudf::detail::host_vector` with the given capacity. + * + * @note The returned vector may be using a pinned memory resource. + * + * @tparam T The type of the vector data + * @param capacity Initial capacity of the vector + * @param stream The stream on which to allocate memory + * @return A host_vector with the given capacity + */ +template +host_vector make_empty_host_vector(size_t capacity, rmm::cuda_stream_view stream) +{ + auto result = host_vector(get_host_allocator(capacity, stream)); + result.reserve(capacity); + return result; +} + /** * @brief Asynchronously construct a `thrust::host_vector` containing a copy of data from a * `device_span` * - * @note This function does not synchronize `stream`. + * @note This function does not synchronize `stream` after the copy. The returned vector may be + * using a pinned memory resource. * * @tparam T The type of the data to copy * @param source_data The device data to copy @@ -373,16 +402,24 @@ std::vector make_std_vector_sync(Container const * @return The data copied to the host */ template -thrust::host_vector make_host_vector_async(device_span v, rmm::cuda_stream_view stream) +host_vector make_host_vector_async(device_span v, rmm::cuda_stream_view stream) { - return make_vector_async>(v, stream); + auto result = make_host_vector(v.size(), stream); + auto const is_pinned = result.get_allocator().is_device_accessible(); + cuda_memcpy_async(result.data(), + v.data(), + v.size() * sizeof(T), + is_pinned ? host_memory_kind::PINNED : host_memory_kind::PAGEABLE, + stream); + return result; } /** * @brief Asynchronously construct a `std::vector` containing a copy of data from a device * container * - * @note This function does not synchronize `stream`. + * @note This function does not synchronize `stream` after the copy. The returned vector may be + * using a pinned memory resource. * * @tparam Container The type of the container to copy from * @tparam T The type of the data to copy @@ -394,8 +431,8 @@ template < typename Container, std::enable_if_t< std::is_convertible_v>>* = nullptr> -thrust::host_vector make_host_vector_async( - Container const& c, rmm::cuda_stream_view stream) +host_vector make_host_vector_async(Container const& c, + rmm::cuda_stream_view stream) { return make_host_vector_async(device_span{c}, stream); } @@ -404,7 +441,8 @@ thrust::host_vector make_host_vector_async( * @brief Synchronously construct a `thrust::host_vector` containing a copy of data from a * `device_span` * - * @note This function does a synchronize on `stream`. + * @note This function does a synchronize on `stream` after the copy. The returned vector may be + * using a pinned memory resource. * * @tparam T The type of the data to copy * @param source_data The device data to copy @@ -412,7 +450,7 @@ thrust::host_vector make_host_vector_async( * @return The data copied to the host */ template -thrust::host_vector make_host_vector_sync(device_span v, rmm::cuda_stream_view stream) +host_vector make_host_vector_sync(device_span v, rmm::cuda_stream_view stream) { auto result = make_host_vector_async(v, stream); stream.synchronize(); @@ -423,7 +461,7 @@ thrust::host_vector make_host_vector_sync(device_span v, rmm::cuda_s * @brief Synchronously construct a `thrust::host_vector` containing a copy of data from a device * container * - * @note This function synchronizes `stream`. + * @note This function synchronizes `stream` after the copy. * * @tparam Container The type of the container to copy from * @tparam T The type of the data to copy @@ -435,8 +473,8 @@ template < typename Container, std::enable_if_t< std::is_convertible_v>>* = nullptr> -thrust::host_vector make_host_vector_sync( - Container const& c, rmm::cuda_stream_view stream) +host_vector make_host_vector_sync(Container const& c, + rmm::cuda_stream_view stream) { return make_host_vector_sync(device_span{c}, stream); } @@ -444,7 +482,7 @@ thrust::host_vector make_host_vector_sync( /** * @brief Asynchronously construct a pinned `cudf::detail::host_vector` of the given size * - * @note This function may not synchronize `stream`. + * @note This function may not synchronize `stream` after the copy. * * @tparam T The type of the vector data * @param size The number of elements in the created vector @@ -460,7 +498,7 @@ host_vector make_pinned_vector_async(size_t size, rmm::cuda_stream_view strea /** * @brief Synchronously construct a pinned `cudf::detail::host_vector` of the given size * - * @note This function synchronizes `stream`. + * @note This function synchronizes `stream` after the copy. * * @tparam T The type of the vector data * @param size The number of elements in the created vector diff --git a/cpp/include/cudf/io/text/detail/trie.hpp b/cpp/include/cudf/io/text/detail/trie.hpp index e0b9c7635e3..28862d97ede 100644 --- a/cpp/include/cudf/io/text/detail/trie.hpp +++ b/cpp/include/cudf/io/text/detail/trie.hpp @@ -223,11 +223,11 @@ struct trie { match_length.emplace_back(0); - std::vector trie_nodes; auto token_counts = std::unordered_map(); + auto trie_nodes = cudf::detail::make_empty_host_vector(tokens.size(), stream); for (uint32_t i = 0; i < tokens.size(); i++) { - trie_nodes.emplace_back(trie_node{tokens[i], match_length[i], transitions[i]}); + trie_nodes.push_back(trie_node{tokens[i], match_length[i], transitions[i]}); token_counts[tokens[i]]++; } diff --git a/cpp/include/cudf/lists/detail/dremel.hpp b/cpp/include/cudf/lists/detail/dremel.hpp index d36a4091947..53448424827 100644 --- a/cpp/include/cudf/lists/detail/dremel.hpp +++ b/cpp/include/cudf/lists/detail/dremel.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,8 +31,8 @@ struct dremel_device_view { size_type const* offsets; uint8_t const* rep_levels; uint8_t const* def_levels; - size_type const leaf_data_size; - uint8_t const max_def_level; + size_type leaf_data_size; + uint8_t max_def_level; }; /** @@ -45,8 +45,8 @@ struct dremel_data { rmm::device_uvector rep_level; rmm::device_uvector def_level; - size_type const leaf_data_size; - uint8_t const max_def_level; + size_type leaf_data_size; + uint8_t max_def_level; operator dremel_device_view() const { diff --git a/cpp/include/cudf/utilities/pinned_memory.hpp b/cpp/include/cudf/utilities/pinned_memory.hpp index 3e2fa43cb50..fa7e1b35327 100644 --- a/cpp/include/cudf/utilities/pinned_memory.hpp +++ b/cpp/include/cudf/utilities/pinned_memory.hpp @@ -71,4 +71,20 @@ void set_kernel_pinned_copy_threshold(size_t threshold); */ size_t get_kernel_pinned_copy_threshold(); +/** + * @brief Set the threshold size for allocating host memory as pinned memory. + * + * @param threshold The threshold size in bytes. If the size of the allocation is less or equal to + * this threshold, the memory will be allocated as pinned memory. If the size is greater than this + * threshold, the memory will be allocated as pageable memory. + */ +void set_allocate_host_as_pinned_threshold(size_t threshold); + +/** + * @brief Get the threshold size for allocating host memory as pinned memory. + * + * @return The threshold size in bytes. + */ +size_t get_allocate_host_as_pinned_threshold(); + } // namespace cudf diff --git a/cpp/include/cudf/utilities/span.hpp b/cpp/include/cudf/utilities/span.hpp index 3b35e60e034..c5054c733a7 100644 --- a/cpp/include/cudf/utilities/span.hpp +++ b/cpp/include/cudf/utilities/span.hpp @@ -16,6 +16,8 @@ #pragma once +#include + #include #include #include @@ -257,6 +259,26 @@ struct host_span : public cudf::detail::span_base>* = nullptr> + constexpr host_span(cudf::detail::host_vector& in) + : base(in.data(), in.size()), _is_device_accessible{in.get_allocator().is_device_accessible()} + { + } + + /// Constructor from a const host_vector + /// @param in The host_vector to construct the span from + template >* = nullptr> + constexpr host_span(cudf::detail::host_vector const& in) + : base(in.data(), in.size()), _is_device_accessible{in.get_allocator().is_device_accessible()} + { + } + // Copy construction to support const conversion /// @param other The span to copy template views, rmm::cuda_stream_vi }); // Assemble contiguous array of device views - auto device_views = thrust::host_vector(); - device_views.reserve(views.size()); + auto device_views = + cudf::detail::make_empty_host_vector(views.size(), stream); std::transform(device_view_owners.cbegin(), device_view_owners.cend(), std::back_inserter(device_views), @@ -84,7 +84,7 @@ auto create_device_views(host_span views, rmm::cuda_stream_vi make_device_uvector_async(device_views, stream, rmm::mr::get_current_device_resource()); // Compute the partition offsets - auto offsets = thrust::host_vector(views.size() + 1); + auto offsets = cudf::detail::make_host_vector(views.size() + 1, stream); thrust::transform_inclusive_scan( thrust::host, device_views.cbegin(), diff --git a/cpp/src/copying/contiguous_split.cu b/cpp/src/copying/contiguous_split.cu index 37db2c74790..95544742fb7 100644 --- a/cpp/src/copying/contiguous_split.cu +++ b/cpp/src/copying/contiguous_split.cu @@ -1539,7 +1539,8 @@ std::unique_ptr chunk_iteration_state::create( std::vector num_batches_per_iteration; std::vector size_of_batches_per_iteration; - std::vector accum_size_per_iteration; + auto accum_size_per_iteration = + cudf::detail::make_empty_host_vector(h_offsets.size(), stream); std::size_t accum_size = 0; { auto current_offset_it = h_offsets.begin(); diff --git a/cpp/src/datetime/timezone.cpp b/cpp/src/datetime/timezone.cpp index 1b0d201501b..7ca1b51df98 100644 --- a/cpp/src/datetime/timezone.cpp +++ b/cpp/src/datetime/timezone.cpp @@ -485,14 +485,12 @@ std::unique_ptr make_timezone_transition_table(std::optional ttimes_typed; - ttimes_typed.reserve(transition_times.size()); + auto ttimes_typed = make_empty_host_vector(transition_times.size(), stream); std::transform(transition_times.cbegin(), transition_times.cend(), std::back_inserter(ttimes_typed), [](auto ts) { return timestamp_s{duration_s{ts}}; }); - std::vector offsets_typed; - offsets_typed.reserve(offsets.size()); + auto offsets_typed = make_empty_host_vector(offsets.size(), stream); std::transform(offsets.cbegin(), offsets.cend(), std::back_inserter(offsets_typed), [](auto ts) { return duration_s{ts}; }); diff --git a/cpp/src/dictionary/detail/concatenate.cu b/cpp/src/dictionary/detail/concatenate.cu index fdc3d9d0ecf..72828309425 100644 --- a/cpp/src/dictionary/detail/concatenate.cu +++ b/cpp/src/dictionary/detail/concatenate.cu @@ -105,7 +105,7 @@ struct compute_children_offsets_fn { */ rmm::device_uvector create_children_offsets(rmm::cuda_stream_view stream) { - std::vector offsets(columns_ptrs.size()); + auto offsets = cudf::detail::make_host_vector(columns_ptrs.size(), stream); thrust::transform_exclusive_scan( thrust::host, columns_ptrs.begin(), diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index 814efe2b5a1..69a0e982a5b 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -554,9 +554,11 @@ table_with_metadata read_avro(std::unique_ptr&& source, auto d_global_dict_data = rmm::device_uvector(0, stream); if (total_dictionary_entries > 0) { - auto h_global_dict = std::vector(total_dictionary_entries); - auto h_global_dict_data = std::vector(dictionary_data_size); - size_t dict_pos = 0; + auto h_global_dict = + cudf::detail::make_host_vector(total_dictionary_entries, stream); + auto h_global_dict_data = + cudf::detail::make_host_vector(dictionary_data_size, stream); + size_t dict_pos = 0; for (size_t i = 0; i < column_types.size(); ++i) { auto const col_idx = selected_columns[i].first; diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index 05faded651d..40d4372ae9d 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -567,7 +567,7 @@ void infer_column_types(parse_options const& parse_opts, } std::vector decode_data(parse_options const& parse_opts, - std::vector const& column_flags, + host_span column_flags, std::vector const& column_names, device_span data, device_span row_offsets, @@ -592,8 +592,8 @@ std::vector decode_data(parse_options const& parse_opts, } } - thrust::host_vector h_data(num_active_columns); - thrust::host_vector h_valid(num_active_columns); + auto h_data = cudf::detail::make_host_vector(num_active_columns, stream); + auto h_valid = cudf::detail::make_host_vector(num_active_columns, stream); for (int i = 0; i < num_active_columns; ++i) { h_data[i] = out_buffers[i].data(); @@ -622,14 +622,16 @@ std::vector decode_data(parse_options const& parse_opts, return out_buffers; } -std::vector determine_column_types(csv_reader_options const& reader_opts, - parse_options const& parse_opts, - host_span column_names, - device_span data, - device_span row_offsets, - int32_t num_records, - host_span column_flags, - rmm::cuda_stream_view stream) +cudf::detail::host_vector determine_column_types( + csv_reader_options const& reader_opts, + parse_options const& parse_opts, + host_span column_names, + device_span data, + device_span row_offsets, + int32_t num_records, + host_span column_flags, + cudf::size_type num_active_columns, + rmm::cuda_stream_view stream) { std::vector column_types(column_flags.size()); @@ -653,7 +655,8 @@ std::vector determine_column_types(csv_reader_options const& reader_o stream); // compact column_types to only include active columns - std::vector active_col_types; + auto active_col_types = + cudf::detail::make_empty_host_vector(num_active_columns, stream); std::copy_if(column_types.cbegin(), column_types.cend(), std::back_inserter(active_col_types), @@ -697,8 +700,10 @@ table_with_metadata read_csv(cudf::io::datasource* source, auto const num_actual_columns = static_cast(column_names.size()); auto num_active_columns = num_actual_columns; - auto column_flags = std::vector( - num_actual_columns, column_parse::enabled | column_parse::inferred); + auto column_flags = + cudf::detail::make_host_vector(num_actual_columns, stream); + std::fill( + column_flags.begin(), column_flags.end(), column_parse::enabled | column_parse::inferred); // User did not pass column names to override names in the file // Process names from the file to remove empty and duplicated strings @@ -842,8 +847,15 @@ table_with_metadata read_csv(cudf::io::datasource* source, // Exclude the end-of-data row from number of rows with actual data auto const num_records = std::max(row_offsets.size(), 1ul) - 1; - auto const column_types = determine_column_types( - reader_opts, parse_opts, column_names, data, row_offsets, num_records, column_flags, stream); + auto const column_types = determine_column_types(reader_opts, + parse_opts, + column_names, + data, + row_offsets, + num_records, + column_flags, + num_active_columns, + stream); auto metadata = table_metadata{}; auto out_columns = std::vector>(); diff --git a/cpp/src/io/json/json_column.cu b/cpp/src/io/json/json_column.cu index 3e587768b11..17fa7abdffe 100644 --- a/cpp/src/io/json/json_column.cu +++ b/cpp/src/io/json/json_column.cu @@ -622,7 +622,7 @@ void make_device_json_column(device_span input, // map{parent_col_id, child_col_name}> = child_col_id, used for null value column tracking std::map, NodeIndexT> mapped_columns; // find column_ids which are values, but should be ignored in validity - std::vector ignore_vals(num_columns, 0); + auto ignore_vals = cudf::detail::make_host_vector(num_columns, stream); std::vector is_mixed_type_column(num_columns, 0); std::vector is_pruned(num_columns, 0); columns.try_emplace(parent_node_sentinel, std::ref(root)); @@ -812,7 +812,7 @@ void make_device_json_column(device_span input, return thrust::get<1>(a) < thrust::get<1>(b); }); // move columns data to device. - std::vector columns_data(num_columns); + auto columns_data = cudf::detail::make_host_vector(num_columns, stream); for (auto& [col_id, col_ref] : columns) { if (col_id == parent_node_sentinel) continue; auto& col = col_ref.get(); diff --git a/cpp/src/io/json/nested_json_gpu.cu b/cpp/src/io/json/nested_json_gpu.cu index 8decaf034f3..1e484d74679 100644 --- a/cpp/src/io/json/nested_json_gpu.cu +++ b/cpp/src/io/json/nested_json_gpu.cu @@ -1703,10 +1703,8 @@ void make_json_column(json_column& root_column, auto const [d_tokens_gpu, d_token_indices_gpu] = get_token_stream(d_input, options, stream, mr); // Copy the JSON tokens to the host - thrust::host_vector tokens = - cudf::detail::make_host_vector_async(d_tokens_gpu, stream); - thrust::host_vector token_indices_gpu = - cudf::detail::make_host_vector_async(d_token_indices_gpu, stream); + auto tokens = cudf::detail::make_host_vector_async(d_tokens_gpu, stream); + auto token_indices_gpu = cudf::detail::make_host_vector_async(d_token_indices_gpu, stream); // Make sure tokens have been copied to the host stream.synchronize(); diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 0ba4dedfc34..590f70864b1 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -78,10 +78,9 @@ device_span ingest_raw_input(device_span buffer, auto constexpr num_delimiter_chars = 1; if (compression == compression_type::NONE) { - std::vector delimiter_map{}; + auto delimiter_map = cudf::detail::make_empty_host_vector(sources.size(), stream); std::vector prefsum_source_sizes(sources.size()); std::vector> h_buffers; - delimiter_map.reserve(sources.size()); size_t bytes_read = 0; std::transform_inclusive_scan(sources.begin(), sources.end(), diff --git a/cpp/src/io/orc/reader_impl_decode.cu b/cpp/src/io/orc/reader_impl_decode.cu index 8e20505d3ff..e3b9a048be8 100644 --- a/cpp/src/io/orc/reader_impl_decode.cu +++ b/cpp/src/io/orc/reader_impl_decode.cu @@ -492,11 +492,17 @@ void scan_null_counts(cudf::detail::hostdevice_2dvector const& if (num_stripes == 0) return; auto const num_columns = chunks.size().second; - std::vector> prefix_sums_to_update; + auto const num_struct_cols = + std::count_if(chunks[0].begin(), chunks[0].end(), [](auto const& chunk) { + return chunk.type_kind == STRUCT; + }); + auto prefix_sums_to_update = + cudf::detail::make_empty_host_vector>(num_struct_cols, + stream); for (auto col_idx = 0ul; col_idx < num_columns; ++col_idx) { // Null counts sums are only needed for children of struct columns if (chunks[0][col_idx].type_kind == STRUCT) { - prefix_sums_to_update.emplace_back(col_idx, d_prefix_sums + num_stripes * col_idx); + prefix_sums_to_update.push_back({col_idx, d_prefix_sums + num_stripes * col_idx}); } } auto const d_prefix_sums_to_update = cudf::detail::make_device_uvector_async( diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 805959327ac..80f32512b98 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -1417,8 +1417,8 @@ void decimal_sizes_to_offsets(device_2dspan rg_bounds, if (rg_bounds.count() == 0) return; // Convert map to a vector of views of the `elem_sizes` device buffers - std::vector h_sizes; - h_sizes.reserve(elem_sizes.size()); + auto h_sizes = + cudf::detail::make_empty_host_vector(elem_sizes.size(), stream); std::transform(elem_sizes.begin(), elem_sizes.end(), std::back_inserter(h_sizes), [](auto& p) { return decimal_column_element_sizes{p.first, p.second}; }); diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 4cb20bb7518..f3b8cfbc836 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -444,14 +444,17 @@ namespace { */ file_segmentation calculate_segmentation(host_span columns, hostdevice_2dvector&& rowgroup_bounds, - stripe_size_limits max_stripe_size) + stripe_size_limits max_stripe_size, + rmm::cuda_stream_view stream) { - std::vector infos; - auto const num_rowgroups = rowgroup_bounds.size().first; - size_t stripe_start = 0; - size_t stripe_bytes = 0; - size_type stripe_rows = 0; - for (size_t rg_idx = 0; rg_idx < num_rowgroups; ++rg_idx) { + // Number of stripes is not known in advance. Only reserve a single element to use pinned memory + // resource if at all enabled. + auto infos = cudf::detail::make_empty_host_vector(1, stream); + size_type const num_rowgroups = rowgroup_bounds.size().first; + size_type stripe_start = 0; + size_t stripe_bytes = 0; + size_type stripe_rows = 0; + for (size_type rg_idx = 0; rg_idx < num_rowgroups; ++rg_idx) { auto const rowgroup_total_bytes = std::accumulate(columns.begin(), columns.end(), 0ul, [&](size_t total_size, auto const& col) { auto const rows = rowgroup_bounds[rg_idx][col.index()].size(); @@ -470,7 +473,9 @@ file_segmentation calculate_segmentation(host_span column // Check if adding the current rowgroup to the stripe will make the stripe too large or long if ((rg_idx > stripe_start) && (stripe_bytes + rowgroup_total_bytes > max_stripe_size.bytes || stripe_rows + rowgroup_rows_max > max_stripe_size.rows)) { - infos.emplace_back(infos.size(), stripe_start, rg_idx - stripe_start); + infos.push_back(stripe_rowgroups{static_cast(infos.size()), + stripe_start, + static_cast(rg_idx - stripe_start)}); stripe_start = rg_idx; stripe_bytes = 0; stripe_rows = 0; @@ -479,7 +484,9 @@ file_segmentation calculate_segmentation(host_span column stripe_bytes += rowgroup_total_bytes; stripe_rows += rowgroup_rows_max; if (rg_idx + 1 == num_rowgroups) { - infos.emplace_back(infos.size(), stripe_start, num_rowgroups - stripe_start); + infos.push_back(stripe_rowgroups{static_cast(infos.size()), + stripe_start, + static_cast(num_rowgroups - stripe_start)}); } } @@ -1336,7 +1343,7 @@ encoded_footer_statistics finish_statistic_blobs(Footer const& footer, if (num_file_blobs == 0) { return {}; } // Create empty file stats and merge groups - std::vector h_stat_chunks(num_file_blobs); + auto h_stat_chunks = cudf::detail::make_host_vector(num_file_blobs, stream); cudf::detail::hostdevice_vector stats_merge(num_file_blobs, stream); // Fill in stats_merge and stat_chunks on the host for (auto i = 0u; i < num_file_blobs; ++i) { @@ -1677,39 +1684,39 @@ struct pushdown_null_masks { // Owning vector for masks in device memory std::vector> data; // Pointers to pushdown masks in device memory. Can be same for multiple columns. - std::vector masks; + cudf::detail::host_vector masks; }; pushdown_null_masks init_pushdown_null_masks(orc_table_view& orc_table, rmm::cuda_stream_view stream) { - std::vector mask_ptrs; - mask_ptrs.reserve(orc_table.num_columns()); + auto mask_ptrs = + cudf::detail::make_empty_host_vector(orc_table.num_columns(), stream); std::vector> pd_masks; for (auto const& col : orc_table.columns) { // Leaf columns don't need pushdown masks if (col.num_children() == 0) { - mask_ptrs.emplace_back(nullptr); + mask_ptrs.push_back({nullptr}); continue; } auto const parent_pd_mask = col.is_child() ? mask_ptrs[col.parent_index()] : nullptr; auto const null_mask = col.null_mask(); if (null_mask == nullptr and parent_pd_mask == nullptr) { - mask_ptrs.emplace_back(nullptr); + mask_ptrs.push_back({nullptr}); continue; } if (col.orc_kind() == STRUCT) { if (null_mask != nullptr and parent_pd_mask == nullptr) { // Reuse own null mask - mask_ptrs.emplace_back(null_mask); + mask_ptrs.push_back(null_mask); } else if (null_mask == nullptr and parent_pd_mask != nullptr) { // Reuse parent's pushdown mask - mask_ptrs.emplace_back(parent_pd_mask); + mask_ptrs.push_back(parent_pd_mask); } else { // Both are nullable, allocate new pushdown mask pd_masks.emplace_back(num_bitmask_words(col.size()), stream); - mask_ptrs.emplace_back(pd_masks.back().data()); + mask_ptrs.push_back({pd_masks.back().data()}); thrust::transform(rmm::exec_policy(stream), null_mask, @@ -1724,7 +1731,7 @@ pushdown_null_masks init_pushdown_null_masks(orc_table_view& orc_table, auto const child_col = orc_table.column(col.child_begin()[0]); // pushdown mask applies to child column(s); use the child column size pd_masks.emplace_back(num_bitmask_words(child_col.size()), stream); - mask_ptrs.emplace_back(pd_masks.back().data()); + mask_ptrs.push_back({pd_masks.back().data()}); pushdown_lists_null_mask(col, orc_table.d_columns, parent_pd_mask, pd_masks.back(), stream); } } @@ -1815,8 +1822,7 @@ orc_table_view make_orc_table_view(table_view const& table, append_orc_column(table.column(col_idx), nullptr, table_meta.column_metadata[col_idx]); } - std::vector type_kinds; - type_kinds.reserve(orc_columns.size()); + auto type_kinds = cudf::detail::make_empty_host_vector(orc_columns.size(), stream); std::transform( orc_columns.cbegin(), orc_columns.cend(), std::back_inserter(type_kinds), [](auto& orc_column) { return orc_column.orc_kind(); @@ -2299,7 +2305,7 @@ auto convert_table_to_orc_data(table_view const& input, // Decide stripe boundaries based on rowgroups and char counts auto segmentation = - calculate_segmentation(orc_table.columns, std::move(rowgroup_bounds), max_stripe_size); + calculate_segmentation(orc_table.columns, std::move(rowgroup_bounds), max_stripe_size, stream); auto stripe_dicts = build_dictionaries(orc_table, segmentation, sort_dictionaries, stream); auto dec_chunk_sizes = decimal_chunk_sizes(orc_table, segmentation, stream); diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index bd082befe0c..f5f8b3cfed9 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -78,10 +78,9 @@ struct orc_table_view { * Provides a container-like interface to iterate over rowgroup indices. */ struct stripe_rowgroups { - uint32_t id; // stripe id - uint32_t first; // first rowgroup in the stripe - uint32_t size; // number of rowgroups in the stripe - stripe_rowgroups(uint32_t id, uint32_t first, uint32_t size) : id{id}, first{first}, size{size} {} + size_type id; // stripe id + size_type first; // first rowgroup in the stripe + size_type size; // number of rowgroups in the stripe [[nodiscard]] auto cbegin() const { return thrust::make_counting_iterator(first); } [[nodiscard]] auto cend() const { return thrust::make_counting_iterator(first + size); } }; @@ -125,7 +124,7 @@ class orc_streams { */ struct file_segmentation { hostdevice_2dvector rowgroups; - std::vector stripes; + cudf::detail::host_vector stripes; auto num_rowgroups() const noexcept { return rowgroups.size().first; } auto num_stripes() const noexcept { return stripes.size(); } diff --git a/cpp/src/io/parquet/predicate_pushdown.cpp b/cpp/src/io/parquet/predicate_pushdown.cpp index 11f4a00ee8b..481c1e9fcdd 100644 --- a/cpp/src/io/parquet/predicate_pushdown.cpp +++ b/cpp/src/io/parquet/predicate_pushdown.cpp @@ -141,11 +141,11 @@ struct stats_caster { // Local struct to hold host columns struct host_column { // using thrust::host_vector because std::vector uses bitmap instead of byte per bool. - thrust::host_vector val; + cudf::detail::host_vector val; std::vector null_mask; cudf::size_type null_count = 0; - host_column(size_type total_row_groups) - : val(total_row_groups), + host_column(size_type total_row_groups, rmm::cuda_stream_view stream) + : val{cudf::detail::make_host_vector(total_row_groups, stream)}, null_mask( cudf::util::div_rounding_up_safe( cudf::bitmask_allocation_size_bytes(total_row_groups), sizeof(bitmask_type)), @@ -170,8 +170,14 @@ struct stats_caster { rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - std::vector chars{}; - std::vector offsets(1, 0); + auto const total_char_count = std::accumulate( + host_strings.begin(), host_strings.end(), 0, [](auto sum, auto const& str) { + return sum + str.size_bytes(); + }); + auto chars = cudf::detail::make_empty_host_vector(total_char_count, stream); + auto offsets = + cudf::detail::make_empty_host_vector(host_strings.size() + 1, stream); + offsets.push_back(0); for (auto const& str : host_strings) { auto tmp = str.empty() ? std::string_view{} : std::string_view(str.data(), str.size_bytes()); @@ -206,8 +212,8 @@ struct stats_caster { null_count); } }; // local struct host_column - host_column min(total_row_groups); - host_column max(total_row_groups); + host_column min(total_row_groups, stream); + host_column max(total_row_groups, stream); size_type stats_idx = 0; for (size_t src_idx = 0; src_idx < row_group_indices.size(); ++src_idx) { for (auto const rg_idx : row_group_indices[src_idx]) { diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 05e0d8c0111..794750ab6d2 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -804,16 +804,16 @@ std::vector compute_page_splits_by_row(device_span> comp_in; - comp_in.reserve(num_comp_pages); - std::vector> comp_out; - comp_out.reserve(num_comp_pages); + auto comp_in = + cudf::detail::make_empty_host_vector>(num_comp_pages, stream); + auto comp_out = + cudf::detail::make_empty_host_vector>(num_comp_pages, stream); // vectors to save v2 def and rep level data, if any - std::vector> copy_in; - copy_in.reserve(num_comp_pages); - std::vector> copy_out; - copy_out.reserve(num_comp_pages); + auto copy_in = + cudf::detail::make_empty_host_vector>(num_comp_pages, stream); + auto copy_out = + cudf::detail::make_empty_host_vector>(num_comp_pages, stream); rmm::device_uvector comp_res(num_comp_pages, stream); thrust::fill(rmm::exec_policy_nosync(stream), @@ -822,7 +822,6 @@ std::vector compute_page_splits_by_row(device_span compute_page_splits_by_row(device_span(offset)}); + copy_out.push_back({dst_base, static_cast(offset)}); } - comp_in.emplace_back(page.page_data + offset, - static_cast(page.compressed_page_size - offset)); - comp_out.emplace_back(dst_base + offset, - static_cast(page.uncompressed_page_size - offset)); + comp_in.push_back( + {page.page_data + offset, static_cast(page.compressed_page_size - offset)}); + comp_out.push_back( + {dst_base + offset, static_cast(page.uncompressed_page_size - offset)}); page.page_data = dst_base; decomp_offset += page.uncompressed_page_size; }); + } + auto d_comp_in = cudf::detail::make_device_uvector_async( + comp_in, stream, rmm::mr::get_current_device_resource()); + auto d_comp_out = cudf::detail::make_device_uvector_async( + comp_out, stream, rmm::mr::get_current_device_resource()); + + int32_t start_pos = 0; + for (auto const& codec : codecs) { + if (codec.num_pages == 0) { continue; } + + device_span const> d_comp_in_view{d_comp_in.data() + start_pos, + codec.num_pages}; + + device_span const> d_comp_out_view(d_comp_out.data() + start_pos, + codec.num_pages); - host_span const> comp_in_view{comp_in.data() + start_pos, - codec.num_pages}; - auto const d_comp_in = cudf::detail::make_device_uvector_async( - comp_in_view, stream, rmm::mr::get_current_device_resource()); - host_span const> comp_out_view(comp_out.data() + start_pos, - codec.num_pages); - auto const d_comp_out = cudf::detail::make_device_uvector_async( - comp_out_view, stream, rmm::mr::get_current_device_resource()); device_span d_comp_res_view(comp_res.data() + start_pos, codec.num_pages); switch (codec.compression_type) { case GZIP: - gpuinflate(d_comp_in, d_comp_out, d_comp_res_view, gzip_header_included::YES, stream); + gpuinflate( + d_comp_in_view, d_comp_out_view, d_comp_res_view, gzip_header_included::YES, stream); break; case SNAPPY: if (cudf::io::nvcomp_integration::is_stable_enabled()) { nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY, - d_comp_in, - d_comp_out, + d_comp_in_view, + d_comp_out_view, d_comp_res_view, codec.max_decompressed_size, codec.total_decomp_size, stream); } else { - gpu_unsnap(d_comp_in, d_comp_out, d_comp_res_view, stream); + gpu_unsnap(d_comp_in_view, d_comp_out, d_comp_res_view, stream); } break; case ZSTD: nvcomp::batched_decompress(nvcomp::compression_type::ZSTD, - d_comp_in, - d_comp_out, + d_comp_in_view, + d_comp_out_view, d_comp_res_view, codec.max_decompressed_size, codec.total_decomp_size, stream); break; case BROTLI: - gpu_debrotli(d_comp_in, - d_comp_out, + gpu_debrotli(d_comp_in_view, + d_comp_out_view, d_comp_res_view, debrotli_scratch.data(), debrotli_scratch.size(), @@ -893,8 +900,8 @@ std::vector compute_page_splits_by_row(device_span chunk decomp_sum{}); // retrieve to host so we can call nvcomp to get compression scratch sizes - std::vector h_decomp_info = - cudf::detail::make_std_vector_sync(decomp_info, stream); - std::vector temp_cost(pages.size()); + auto h_decomp_info = cudf::detail::make_host_vector_sync(decomp_info, stream); + auto temp_cost = cudf::detail::make_host_vector(pages.size(), stream); thrust::transform(thrust::host, h_decomp_info.begin(), h_decomp_info.end(), diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index ff47dfc4cf3..e006cc7d714 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -370,7 +370,7 @@ void fill_in_page_info(host_span chunks, rmm::cuda_stream_view stream) { auto const num_pages = pages.size(); - std::vector page_indexes(num_pages); + auto page_indexes = cudf::detail::make_host_vector(num_pages, stream); for (size_t c = 0, page_count = 0; c < chunks.size(); c++) { auto const& chunk = chunks[c]; @@ -1031,8 +1031,8 @@ struct get_page_num_rows { }; struct input_col_info { - int const schema_idx; - size_type const nesting_depth; + int schema_idx; + size_type nesting_depth; }; /** @@ -1523,8 +1523,8 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num // compute output column sizes by examining the pages of the -input- columns if (has_lists) { - std::vector h_cols_info; - h_cols_info.reserve(_input_columns.size()); + auto h_cols_info = + cudf::detail::make_empty_host_vector(_input_columns.size(), _stream); std::transform(_input_columns.cbegin(), _input_columns.cend(), std::back_inserter(h_cols_info), diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 8413e716224..2df71b77301 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1824,7 +1824,8 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, size_type max_page_fragment_size = max_page_fragment_size_opt.value_or(default_max_page_fragment_size); - std::vector column_frag_size(num_columns, max_page_fragment_size); + auto column_frag_size = cudf::detail::make_host_vector(num_columns, stream); + std::fill(column_frag_size.begin(), column_frag_size.end(), max_page_fragment_size); if (input.num_rows() > 0 && not max_page_fragment_size_opt.has_value()) { std::vector column_sizes; @@ -1880,7 +1881,9 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, size_type num_fragments = std::reduce(num_frag_in_part.begin(), num_frag_in_part.end()); - std::vector part_frag_offset; // Store the idx of the first fragment in each partition + auto part_frag_offset = + cudf::detail::make_empty_host_vector(num_frag_in_part.size() + 1, stream); + // Store the idx of the first fragment in each partition std::exclusive_scan( num_frag_in_part.begin(), num_frag_in_part.end(), std::back_inserter(part_frag_offset), 0); part_frag_offset.push_back(part_frag_offset.back() + num_frag_in_part.back()); diff --git a/cpp/src/lists/dremel.cu b/cpp/src/lists/dremel.cu index 5625e1bf05c..50f40924478 100644 --- a/cpp/src/lists/dremel.cu +++ b/cpp/src/lists/dremel.cu @@ -257,10 +257,8 @@ dremel_data get_encoding(column_view h_col, }, stream); - thrust::host_vector column_offsets = - cudf::detail::make_host_vector_async(d_column_offsets, stream); - thrust::host_vector column_ends = - cudf::detail::make_host_vector_async(d_column_ends, stream); + auto column_offsets = cudf::detail::make_host_vector_async(d_column_offsets, stream); + auto column_ends = cudf::detail::make_host_vector_async(d_column_ends, stream); stream.synchronize(); size_t max_vals_size = 0; diff --git a/cpp/src/strings/combine/join.cu b/cpp/src/strings/combine/join.cu index c4cc0dbe09d..b534e9b2e5b 100644 --- a/cpp/src/strings/combine/join.cu +++ b/cpp/src/strings/combine/join.cu @@ -169,8 +169,10 @@ std::unique_ptr join_strings(strings_column_view const& input, // build the offsets: single string output has offsets [0,chars-size] auto offsets_column = [&] { - auto offsets = cudf::detail::make_device_uvector_async( - std::vector({0, static_cast(chars.size())}), stream, mr); + auto h_offsets = cudf::detail::make_host_vector(2, stream); + h_offsets[0] = 0; + h_offsets[1] = chars.size(); + auto offsets = cudf::detail::make_device_uvector_async(h_offsets, stream, mr); return std::make_unique(std::move(offsets), rmm::device_buffer{}, 0); }(); diff --git a/cpp/src/strings/convert/convert_datetime.cu b/cpp/src/strings/convert/convert_datetime.cu index 2f4ebf97264..64a2107e17a 100644 --- a/cpp/src/strings/convert/convert_datetime.cu +++ b/cpp/src/strings/convert/convert_datetime.cu @@ -123,7 +123,7 @@ struct format_compiler { : format(fmt), d_items(0, stream) { specifiers.insert(extra_specifiers.begin(), extra_specifiers.end()); - std::vector items; + auto items = cudf::detail::make_empty_host_vector(format.length(), stream); auto str = format.data(); auto length = format.length(); while (length > 0) { diff --git a/cpp/src/strings/copying/concatenate.cu b/cpp/src/strings/copying/concatenate.cu index 7622e39e735..352e0f9f41a 100644 --- a/cpp/src/strings/copying/concatenate.cu +++ b/cpp/src/strings/copying/concatenate.cu @@ -79,7 +79,7 @@ auto create_strings_device_views(host_span views, rmm::cuda_s // Compute the partition offsets and size of offset column // Note: Using 64-bit size_t so we can detect overflow of 32-bit size_type - auto input_offsets = std::vector(views.size() + 1); + auto input_offsets = cudf::detail::make_host_vector(views.size() + 1, stream); auto offset_it = std::next(input_offsets.begin()); thrust::transform( thrust::host, views.begin(), views.end(), offset_it, [](auto const& col) -> size_t { diff --git a/cpp/src/strings/filter_chars.cu b/cpp/src/strings/filter_chars.cu index a34828fa97e..48620af8cad 100644 --- a/cpp/src/strings/filter_chars.cu +++ b/cpp/src/strings/filter_chars.cu @@ -129,7 +129,7 @@ std::unique_ptr filter_characters( // convert input table for copy to device memory size_type table_size = static_cast(characters_to_filter.size()); - thrust::host_vector htable(table_size); + auto htable = cudf::detail::make_host_vector(table_size, stream); std::transform( characters_to_filter.begin(), characters_to_filter.end(), htable.begin(), [](auto entry) { return char_range{entry.first, entry.second}; diff --git a/cpp/src/strings/replace/multi_re.cu b/cpp/src/strings/replace/multi_re.cu index cd60a4296b9..31234ea42ec 100644 --- a/cpp/src/strings/replace/multi_re.cu +++ b/cpp/src/strings/replace/multi_re.cu @@ -171,7 +171,7 @@ std::unique_ptr replace_re(strings_column_view const& input, auto d_buffer = rmm::device_buffer(buffer_size, stream); // copy all the reprog_device instances to a device memory array - std::vector progs; + auto progs = cudf::detail::make_empty_host_vector(h_progs.size(), stream); std::transform(h_progs.begin(), h_progs.end(), std::back_inserter(progs), diff --git a/cpp/src/strings/translate.cu b/cpp/src/strings/translate.cu index 16b22d0de4c..a242b008a54 100644 --- a/cpp/src/strings/translate.cu +++ b/cpp/src/strings/translate.cu @@ -97,7 +97,7 @@ std::unique_ptr translate(strings_column_view const& strings, size_type table_size = static_cast(chars_table.size()); // convert input table - thrust::host_vector htable(table_size); + auto htable = cudf::detail::make_host_vector(table_size, stream); std::transform(chars_table.begin(), chars_table.end(), htable.begin(), [](auto entry) { return translate_table{entry.first, entry.second}; }); diff --git a/cpp/src/table/row_operators.cu b/cpp/src/table/row_operators.cu index 13c31e8ae4c..2969557c78f 100644 --- a/cpp/src/table/row_operators.cu +++ b/cpp/src/table/row_operators.cu @@ -308,7 +308,10 @@ auto decompose_structs(table_view table, auto list_lex_preprocess(table_view const& table, rmm::cuda_stream_view stream) { std::vector dremel_data; - std::vector dremel_device_views; + auto const num_list_columns = std::count_if( + table.begin(), table.end(), [](auto const& col) { return col.type().id() == type_id::LIST; }); + auto dremel_device_views = + cudf::detail::make_empty_host_vector(num_list_columns, stream); for (auto const& col : table) { if (col.type().id() == type_id::LIST) { dremel_data.push_back(detail::get_comparator_data(col, {}, false, stream)); diff --git a/cpp/src/utilities/cuda_memcpy.cu b/cpp/src/utilities/cuda_memcpy.cu index 3d0822d8545..0efb881eb3e 100644 --- a/cpp/src/utilities/cuda_memcpy.cu +++ b/cpp/src/utilities/cuda_memcpy.cu @@ -14,6 +14,9 @@ * limitations under the License. */ +#include "cudf/detail/utilities/integer_utils.hpp" + +#include #include #include #include @@ -26,15 +29,24 @@ namespace cudf::detail { namespace { +// Simple kernel to copy between device buffers +CUDF_KERNEL void copy_kernel(char const* src, char* dst, size_t n) +{ + auto const idx = cudf::detail::grid_1d::global_thread_id(); + if (idx < n) { dst[idx] = src[idx]; } +} + void copy_pinned(void* dst, void const* src, std::size_t size, rmm::cuda_stream_view stream) { if (size == 0) return; if (size < get_kernel_pinned_copy_threshold()) { - thrust::copy_n(rmm::exec_policy_nosync(stream), - static_cast(src), - size, - static_cast(dst)); + const int block_size = 256; + auto const grid_size = cudf::util::div_rounding_up_safe(size, block_size); + // We are explicitly launching the kernel here instead of calling a thrust function because the + // thrust function can potentially call cudaMemcpyAsync instead of using a kernel + copy_kernel<<>>( + static_cast(src), static_cast(dst), size); } else { CUDF_CUDA_TRY(cudaMemcpyAsync(dst, src, size, cudaMemcpyDefault, stream)); } diff --git a/cpp/src/utilities/pinned_memory.cpp b/cpp/src/utilities/host_memory.cpp similarity index 73% rename from cpp/src/utilities/pinned_memory.cpp rename to cpp/src/utilities/host_memory.cpp index 3ea4293fc60..7c3cea42023 100644 --- a/cpp/src/utilities/pinned_memory.cpp +++ b/cpp/src/utilities/host_memory.cpp @@ -83,7 +83,7 @@ class fixed_pinned_pool_memory_resource { void deallocate_async(void* ptr, std::size_t bytes, std::size_t alignment, - cuda::stream_ref stream) noexcept + cuda::stream_ref stream) { if (bytes <= pool_size_ && ptr >= pool_begin_ && ptr < pool_end_) { pool_->deallocate_async(ptr, bytes, alignment, stream); @@ -92,14 +92,14 @@ class fixed_pinned_pool_memory_resource { } } - void deallocate_async(void* ptr, std::size_t bytes, cuda::stream_ref stream) noexcept + void deallocate_async(void* ptr, std::size_t bytes, cuda::stream_ref stream) { return deallocate_async(ptr, bytes, rmm::RMM_DEFAULT_HOST_ALIGNMENT, stream); } void deallocate(void* ptr, std::size_t bytes, - std::size_t alignment = rmm::RMM_DEFAULT_HOST_ALIGNMENT) noexcept + std::size_t alignment = rmm::RMM_DEFAULT_HOST_ALIGNMENT) { deallocate_async(ptr, bytes, alignment, stream_); stream_.wait(); @@ -186,6 +186,61 @@ CUDF_EXPORT rmm::host_device_async_resource_ref& host_mr() return mr_ref; } +class new_delete_memory_resource { + public: + void* allocate(std::size_t bytes, std::size_t alignment = rmm::RMM_DEFAULT_HOST_ALIGNMENT) + { + try { + return rmm::detail::aligned_host_allocate( + bytes, alignment, [](std::size_t size) { return ::operator new(size); }); + } catch (std::bad_alloc const& e) { + CUDF_FAIL("Failed to allocate memory: " + std::string{e.what()}, rmm::out_of_memory); + } + } + + void* allocate_async(std::size_t bytes, [[maybe_unused]] cuda::stream_ref stream) + { + return allocate(bytes, rmm::RMM_DEFAULT_HOST_ALIGNMENT); + } + + void* allocate_async(std::size_t bytes, + std::size_t alignment, + [[maybe_unused]] cuda::stream_ref stream) + { + return allocate(bytes, alignment); + } + + void deallocate(void* ptr, + std::size_t bytes, + std::size_t alignment = rmm::RMM_DEFAULT_HOST_ALIGNMENT) + { + rmm::detail::aligned_host_deallocate( + ptr, bytes, alignment, [](void* ptr) { ::operator delete(ptr); }); + } + + void deallocate_async(void* ptr, + std::size_t bytes, + std::size_t alignment, + [[maybe_unused]] cuda::stream_ref stream) + { + deallocate(ptr, bytes, alignment); + } + + void deallocate_async(void* ptr, std::size_t bytes, cuda::stream_ref stream) + { + deallocate(ptr, bytes, rmm::RMM_DEFAULT_HOST_ALIGNMENT); + } + + bool operator==(new_delete_memory_resource const& other) const { return true; } + + bool operator!=(new_delete_memory_resource const& other) const { return !operator==(other); } + + friend void get_property(new_delete_memory_resource const&, cuda::mr::host_accessible) noexcept {} +}; + +static_assert(cuda::mr::resource_with, + "Pageable pool mr must be accessible from the host"); + } // namespace rmm::host_device_async_resource_ref set_pinned_memory_resource( @@ -225,4 +280,29 @@ void set_kernel_pinned_copy_threshold(size_t threshold) size_t get_kernel_pinned_copy_threshold() { return kernel_pinned_copy_threshold(); } +CUDF_EXPORT auto& allocate_host_as_pinned_threshold() +{ + // use pageable memory for all host allocations + static std::atomic threshold = 0; + return threshold; +} + +void set_allocate_host_as_pinned_threshold(size_t threshold) +{ + allocate_host_as_pinned_threshold() = threshold; +} + +size_t get_allocate_host_as_pinned_threshold() { return allocate_host_as_pinned_threshold(); } + +namespace detail { + +CUDF_EXPORT rmm::host_async_resource_ref get_pageable_memory_resource() +{ + static new_delete_memory_resource mr{}; + static rmm::host_async_resource_ref mr_ref{mr}; + return mr_ref; +} + +} // namespace detail + } // namespace cudf diff --git a/cpp/tests/io/json/json_tree.cpp b/cpp/tests/io/json/json_tree.cpp index 7a72b77e1fb..8bcd5790e99 100644 --- a/cpp/tests/io/json/json_tree.cpp +++ b/cpp/tests/io/json/json_tree.cpp @@ -235,10 +235,8 @@ tree_meta_t2 get_tree_representation_cpu( { constexpr bool include_quote_char = true; // Copy the JSON tokens to the host - thrust::host_vector tokens = - cudf::detail::make_host_vector_async(tokens_gpu, stream); - thrust::host_vector token_indices = - cudf::detail::make_host_vector_async(token_indices_gpu1, stream); + auto tokens = cudf::detail::make_host_vector_async(tokens_gpu, stream); + auto token_indices = cudf::detail::make_host_vector_async(token_indices_gpu1, stream); // Make sure tokens have been copied to the host stream.synchronize(); diff --git a/cpp/tests/strings/integers_tests.cpp b/cpp/tests/strings/integers_tests.cpp index 51e9b3bd0a0..7a038fa6d75 100644 --- a/cpp/tests/strings/integers_tests.cpp +++ b/cpp/tests/strings/integers_tests.cpp @@ -294,7 +294,7 @@ TYPED_TEST(StringsIntegerConvertTest, FromToInteger) std::iota(h_integers.begin(), h_integers.end(), -(TypeParam)(h_integers.size() / 2)); h_integers.push_back(std::numeric_limits::min()); h_integers.push_back(std::numeric_limits::max()); - auto d_integers = cudf::detail::make_device_uvector_sync( + auto const d_integers = cudf::detail::make_device_uvector_sync( h_integers, cudf::get_default_stream(), rmm::mr::get_current_device_resource()); auto integers = cudf::make_numeric_column(cudf::data_type{cudf::type_to_id()}, (cudf::size_type)d_integers.size()); @@ -308,8 +308,6 @@ TYPED_TEST(StringsIntegerConvertTest, FromToInteger) // convert to strings auto results_strings = cudf::strings::from_integers(integers->view()); - // copy back to host - h_integers = cudf::detail::make_host_vector_sync(d_integers, cudf::get_default_stream()); std::vector h_strings; for (auto itr = h_integers.begin(); itr != h_integers.end(); ++itr) h_strings.push_back(std::to_string(*itr)); diff --git a/cpp/tests/utilities_tests/pinned_memory_tests.cpp b/cpp/tests/utilities_tests/pinned_memory_tests.cpp index df9103640f4..93259fd63ee 100644 --- a/cpp/tests/utilities_tests/pinned_memory_tests.cpp +++ b/cpp/tests/utilities_tests/pinned_memory_tests.cpp @@ -18,16 +18,33 @@ #include #include +#include #include +#include #include #include #include #include -class PinnedMemoryTest : public cudf::test::BaseFixture {}; +class PinnedMemoryTest : public cudf::test::BaseFixture { + size_t prev_copy_threshold; + size_t prev_alloc_threshold; -TEST(PinnedMemoryTest, MemoryResourceGetAndSet) + public: + PinnedMemoryTest() + : prev_copy_threshold{cudf::get_kernel_pinned_copy_threshold()}, + prev_alloc_threshold{cudf::get_allocate_host_as_pinned_threshold()} + { + } + ~PinnedMemoryTest() override + { + cudf::set_kernel_pinned_copy_threshold(prev_copy_threshold); + cudf::set_allocate_host_as_pinned_threshold(prev_alloc_threshold); + } +}; + +TEST_F(PinnedMemoryTest, MemoryResourceGetAndSet) { // Global environment for temporary files auto const temp_env = static_cast( @@ -63,3 +80,49 @@ TEST(PinnedMemoryTest, MemoryResourceGetAndSet) // reset memory resource back cudf::set_pinned_memory_resource(last_mr); } + +TEST_F(PinnedMemoryTest, KernelCopyThresholdGetAndSet) +{ + cudf::set_kernel_pinned_copy_threshold(12345); + EXPECT_EQ(cudf::get_kernel_pinned_copy_threshold(), 12345); +} + +TEST_F(PinnedMemoryTest, HostAsPinnedThresholdGetAndSet) +{ + cudf::set_allocate_host_as_pinned_threshold(12345); + EXPECT_EQ(cudf::get_allocate_host_as_pinned_threshold(), 12345); +} + +TEST_F(PinnedMemoryTest, MakePinnedVector) +{ + cudf::set_allocate_host_as_pinned_threshold(0); + + // should always use pinned memory + { + auto const vec = cudf::detail::make_pinned_vector_async(1, cudf::get_default_stream()); + EXPECT_TRUE(vec.get_allocator().is_device_accessible()); + } +} + +TEST_F(PinnedMemoryTest, MakeHostVector) +{ + cudf::set_allocate_host_as_pinned_threshold(7); + + // allocate smaller than the threshold + { + auto const vec = cudf::detail::make_host_vector(1, cudf::get_default_stream()); + EXPECT_TRUE(vec.get_allocator().is_device_accessible()); + } + + // allocate the same size as the threshold + { + auto const vec = cudf::detail::make_host_vector(7, cudf::get_default_stream()); + EXPECT_TRUE(vec.get_allocator().is_device_accessible()); + } + + // allocate larger than the threshold + { + auto const vec = cudf::detail::make_host_vector(2, cudf::get_default_stream()); + EXPECT_FALSE(vec.get_allocator().is_device_accessible()); + } +}