diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index ca85996b990..aab0a9b2d49 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -664,6 +664,7 @@ add_library( src/utilities/default_stream.cpp src/utilities/linked_column.cpp src/utilities/logger.cpp + src/utilities/pinned_memory.cpp src/utilities/stacktrace.cpp src/utilities/stream_pool.cpp src/utilities/traits.cpp diff --git a/cpp/benchmarks/fixture/nvbench_fixture.hpp b/cpp/benchmarks/fixture/nvbench_fixture.hpp index ebcbcb17e98..df1492690bb 100644 --- a/cpp/benchmarks/fixture/nvbench_fixture.hpp +++ b/cpp/benchmarks/fixture/nvbench_fixture.hpp @@ -15,8 +15,8 @@ */ #pragma once -#include #include +#include #include #include @@ -81,17 +81,18 @@ struct nvbench_base_fixture { "\nExpecting: cuda, pool, async, arena, managed, or managed_pool"); } - inline rmm::host_async_resource_ref make_cuio_host_pinned() + inline rmm::host_device_async_resource_ref make_cuio_host_pinned() { static std::shared_ptr mr = std::make_shared(); return *mr; } - inline rmm::host_async_resource_ref create_cuio_host_memory_resource(std::string const& mode) + inline rmm::host_device_async_resource_ref create_cuio_host_memory_resource( + std::string const& mode) { if (mode == "pinned") return make_cuio_host_pinned(); - if (mode == "pinned_pool") return cudf::io::get_host_memory_resource(); + if (mode == "pinned_pool") return cudf::get_pinned_memory_resource(); CUDF_FAIL("Unknown cuio_host_mem parameter: " + mode + "\nExpecting: pinned or pinned_pool"); } @@ -112,14 +113,14 @@ struct nvbench_base_fixture { rmm::mr::set_current_device_resource(mr.get()); std::cout << "RMM memory resource = " << rmm_mode << "\n"; - cudf::io::set_host_memory_resource(create_cuio_host_memory_resource(cuio_host_mode)); + cudf::set_pinned_memory_resource(create_cuio_host_memory_resource(cuio_host_mode)); std::cout << "CUIO host memory resource = " << cuio_host_mode << "\n"; } ~nvbench_base_fixture() { // Ensure the the pool is freed before the CUDA context is destroyed: - cudf::io::set_host_memory_resource(this->make_cuio_host_pinned()); + cudf::set_pinned_memory_resource(this->make_cuio_host_pinned()); } std::shared_ptr mr; diff --git a/cpp/benchmarks/io/cuio_common.cpp b/cpp/benchmarks/io/cuio_common.cpp index 37ced8ea703..645994f3f0d 100644 --- a/cpp/benchmarks/io/cuio_common.cpp +++ b/cpp/benchmarks/io/cuio_common.cpp @@ -19,6 +19,9 @@ #include #include +#include +#include + #include #include @@ -28,6 +31,14 @@ temp_directory const cuio_source_sink_pair::tmpdir{"cudf_gbench"}; +// Don't use cudf's pinned pool for the source data +rmm::host_async_resource_ref pinned_memory_resource() +{ + static rmm::mr::pinned_host_memory_resource mr = rmm::mr::pinned_host_memory_resource{}; + + return mr; +} + std::string random_file_in_dir(std::string const& dir_path) { // `mkstemp` modifies the template in place @@ -41,6 +52,7 @@ std::string random_file_in_dir(std::string const& dir_path) cuio_source_sink_pair::cuio_source_sink_pair(io_type type) : type{type}, + pinned_buffer({pinned_memory_resource(), cudf::get_default_stream()}), d_buffer{0, cudf::get_default_stream()}, file_name{random_file_in_dir(tmpdir.path())}, void_sink{cudf::io::data_sink::create()} diff --git a/cpp/benchmarks/io/cuio_common.hpp b/cpp/benchmarks/io/cuio_common.hpp index d4f39a5f243..64d6021cf50 100644 --- a/cpp/benchmarks/io/cuio_common.hpp +++ b/cpp/benchmarks/io/cuio_common.hpp @@ -18,7 +18,7 @@ #include -#include +#include #include #include @@ -79,7 +79,7 @@ class cuio_source_sink_pair { io_type const type; std::vector h_buffer; - cudf::detail::pinned_host_vector pinned_buffer; + cudf::detail::host_vector pinned_buffer; rmm::device_uvector d_buffer; std::string const file_name; std::unique_ptr void_sink; diff --git a/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp index a67d1932951..b4c8ed78ed8 100644 --- a/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp +++ b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp @@ -20,9 +20,9 @@ #include #include -#include #include #include +#include #include #include diff --git a/cpp/benchmarks/io/text/multibyte_split.cpp b/cpp/benchmarks/io/text/multibyte_split.cpp index b5d855d8881..67705863d41 100644 --- a/cpp/benchmarks/io/text/multibyte_split.cpp +++ b/cpp/benchmarks/io/text/multibyte_split.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ #include #include -#include #include #include #include @@ -132,9 +131,10 @@ static void bench_multibyte_split(nvbench::state& state, auto const delim_factor = static_cast(delim_percent) / 100; std::unique_ptr datasource; - auto device_input = create_random_input(file_size_approx, delim_factor, 0.05, delim); - auto host_input = std::vector{}; - auto host_pinned_input = cudf::detail::pinned_host_vector{}; + auto device_input = create_random_input(file_size_approx, delim_factor, 0.05, delim); + auto host_input = std::vector{}; + auto host_pinned_input = + cudf::detail::make_pinned_vector_async(0, cudf::get_default_stream()); if (source_type != data_chunk_source_type::device && source_type != data_chunk_source_type::host_pinned) { diff --git a/cpp/include/cudf/detail/utilities/rmm_host_vector.hpp b/cpp/include/cudf/detail/utilities/host_vector.hpp similarity index 93% rename from cpp/include/cudf/detail/utilities/rmm_host_vector.hpp rename to cpp/include/cudf/detail/utilities/host_vector.hpp index 6901a19473e..6a115177ab5 100644 --- a/cpp/include/cudf/detail/utilities/rmm_host_vector.hpp +++ b/cpp/include/cudf/detail/utilities/host_vector.hpp @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -32,8 +33,6 @@ namespace cudf::detail { /*! \p rmm_host_allocator is a CUDA-specific host memory allocator * that employs \c a `rmm::host_async_resource_ref` for allocation. * - * This implementation is ported from pinned_host_vector in cudf. - * * \see https://en.cppreference.com/w/cpp/memory/allocator */ template @@ -42,8 +41,6 @@ class rmm_host_allocator; /*! \p rmm_host_allocator is a CUDA-specific host memory allocator * that employs \c an `cudf::host_async_resource_ref` for allocation. * - * This implementation is ported from pinned_host_vector in cudf. - * * \see https://en.cppreference.com/w/cpp/memory/allocator */ template <> @@ -70,8 +67,7 @@ class rmm_host_allocator { * The \p rmm_host_allocator provides an interface for host memory allocation through the user * provided \c `rmm::host_async_resource_ref`. The \p rmm_host_allocator does not take ownership of * this reference and therefore it is the user's responsibility to ensure its lifetime for the - * duration of the lifetime of the \p rmm_host_allocator. This implementation is ported from - * pinned_host_vector in cudf. + * duration of the lifetime of the \p rmm_host_allocator. * * \see https://en.cppreference.com/w/cpp/memory/allocator */ @@ -121,8 +117,12 @@ class rmm_host_allocator { inline pointer allocate(size_type cnt) { if (cnt > this->max_size()) { throw std::bad_alloc(); } // end if - return static_cast( - mr.allocate_async(cnt * sizeof(value_type), rmm::RMM_DEFAULT_HOST_ALIGNMENT, stream)); + auto const result = + mr.allocate_async(cnt * sizeof(value_type), rmm::RMM_DEFAULT_HOST_ALIGNMENT, stream); + // Synchronize to ensure the memory is allocated before thrust::host_vector initialization + // TODO: replace thrust::host_vector with a type that does not require synchronization + stream.synchronize(); + return static_cast(result); } /** @@ -182,6 +182,6 @@ class rmm_host_allocator { * @brief A vector class with rmm host memory allocator */ template -using rmm_host_vector = thrust::host_vector>; +using host_vector = thrust::host_vector>; } // namespace cudf::detail diff --git a/cpp/include/cudf/detail/utilities/pinned_host_vector.hpp b/cpp/include/cudf/detail/utilities/pinned_host_vector.hpp deleted file mode 100644 index c22b6a6ba15..00000000000 --- a/cpp/include/cudf/detail/utilities/pinned_host_vector.hpp +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Copyright (c) 2008-2024, NVIDIA CORPORATION - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -#include - -#include -#include -#include // for bad_alloc - -namespace cudf::detail { - -/*! \p pinned_allocator is a CUDA-specific host memory allocator - * that employs \c cudaMallocHost for allocation. - * - * This implementation is ported from the experimental/pinned_allocator - * that Thrust used to provide. - * - * \see https://en.cppreference.com/w/cpp/memory/allocator - */ -template -class pinned_allocator; - -/*! \p pinned_allocator is a CUDA-specific host memory allocator - * that employs \c cudaMallocHost for allocation. - * - * This implementation is ported from the experimental/pinned_allocator - * that Thrust used to provide. - * - * \see https://en.cppreference.com/w/cpp/memory/allocator - */ -template <> -class pinned_allocator { - public: - using value_type = void; ///< The type of the elements in the allocator - using pointer = void*; ///< The type returned by address() / allocate() - using const_pointer = void const*; ///< The type returned by address() - using size_type = std::size_t; ///< The type used for the size of the allocation - using difference_type = std::ptrdiff_t; ///< The type of the distance between two pointers - - /** - * @brief converts a `pinned_allocator` to `pinned_allocator` - */ - template - struct rebind { - using other = pinned_allocator; ///< The rebound type - }; -}; - -/*! \p pinned_allocator is a CUDA-specific host memory allocator - * that employs \c cudaMallocHost for allocation. - * - * This implementation is ported from the experimental/pinned_allocator - * that Thrust used to provide. - * - * \see https://en.cppreference.com/w/cpp/memory/allocator - */ -template -class pinned_allocator { - public: - using value_type = T; ///< The type of the elements in the allocator - using pointer = T*; ///< The type returned by address() / allocate() - using const_pointer = T const*; ///< The type returned by address() - using reference = T&; ///< The parameter type for address() - using const_reference = T const&; ///< The parameter type for address() - using size_type = std::size_t; ///< The type used for the size of the allocation - using difference_type = std::ptrdiff_t; ///< The type of the distance between two pointers - - /** - * @brief converts a `pinned_allocator` to `pinned_allocator` - */ - template - struct rebind { - using other = pinned_allocator; ///< The rebound type - }; - - /** - * @brief pinned_allocator's null constructor does nothing. - */ - __host__ __device__ inline pinned_allocator() {} - - /** - * @brief pinned_allocator's null destructor does nothing. - */ - __host__ __device__ inline ~pinned_allocator() {} - - /** - * @brief pinned_allocator's copy constructor does nothing. - */ - __host__ __device__ inline pinned_allocator(pinned_allocator const&) {} - - /** - * @brief pinned_allocator's copy constructor does nothing. - * - * This version of pinned_allocator's copy constructor - * is templated on the \c value_type of the pinned_allocator - * to copy from. It is provided merely for convenience; it - * does nothing. - */ - template - __host__ __device__ inline pinned_allocator(pinned_allocator const&) - { - } - - /** - * @brief This method returns the address of a \c reference of - * interest. - * - * @param r The \c reference of interest. - * @return \c r's address. - */ - __host__ __device__ inline pointer address(reference r) { return &r; } - - /** - * @brief This method returns the address of a \c const_reference - * of interest. - * - * @param r The \c const_reference of interest. - * @return \c r's address. - */ - __host__ __device__ inline const_pointer address(const_reference r) { return &r; } - - /** - * @brief This method allocates storage for objects in pinned host - * memory. - * - * @param cnt The number of objects to allocate. - * @return a \c pointer to the newly allocated objects. - * @note The second parameter to this function is meant as a - * hint pointer to a nearby memory location, but is - * not used by this allocator. - * @note This method does not invoke \p value_type's constructor. - * It is the responsibility of the caller to initialize the - * objects at the returned \c pointer. - */ - __host__ inline pointer allocate(size_type cnt, const_pointer /*hint*/ = 0) - { - if (cnt > this->max_size()) { throw std::bad_alloc(); } // end if - - pointer result(0); - CUDF_CUDA_TRY(cudaMallocHost(reinterpret_cast(&result), cnt * sizeof(value_type))); - return result; - } - - /** - * @brief This method deallocates pinned host memory previously allocated - * with this \c pinned_allocator. - * - * @param p A \c pointer to the previously allocated memory. - * @note The second parameter is the number of objects previously allocated - * but is ignored by this allocator. - * @note This method does not invoke \p value_type's destructor. - * It is the responsibility of the caller to destroy - * the objects stored at \p p. - */ - __host__ inline void deallocate(pointer p, size_type /*cnt*/) - { - auto dealloc_worked = cudaFreeHost(p); - (void)dealloc_worked; - assert(dealloc_worked == cudaSuccess); - } - - /** - * @brief This method returns the maximum size of the \c cnt parameter - * accepted by the \p allocate() method. - * - * @return The maximum number of objects that may be allocated - * by a single call to \p allocate(). - */ - inline size_type max_size() const { return (std::numeric_limits::max)() / sizeof(T); } - - /** - * @brief This method tests this \p pinned_allocator for equality to - * another. - * - * @param x The other \p pinned_allocator of interest. - * @return This method always returns \c true. - */ - __host__ __device__ inline bool operator==(pinned_allocator const& x) const { return true; } - - /** - * @brief This method tests this \p pinned_allocator for inequality - * to another. - * - * @param x The other \p pinned_allocator of interest. - * @return This method always returns \c false. - */ - __host__ __device__ inline bool operator!=(pinned_allocator const& x) const - { - return !operator==(x); - } -}; - -/** - * @brief A vector class with pinned host memory allocator - */ -template -using pinned_host_vector = thrust::host_vector>; - -} // namespace cudf::detail diff --git a/cpp/include/cudf/detail/utilities/vector_factories.hpp b/cpp/include/cudf/detail/utilities/vector_factories.hpp index 293a4096c57..20cb55bb1c7 100644 --- a/cpp/include/cudf/detail/utilities/vector_factories.hpp +++ b/cpp/include/cudf/detail/utilities/vector_factories.hpp @@ -21,8 +21,10 @@ * @file vector_factories.hpp */ +#include #include #include +#include #include #include @@ -380,7 +382,7 @@ thrust::host_vector make_host_vector_async(device_span v, rmm::cuda_ * @brief Asynchronously construct a `std::vector` containing a copy of data from a device * container * - * @note This function synchronizes `stream`. + * @note This function does not synchronize `stream`. * * @tparam Container The type of the container to copy from * @tparam T The type of the data to copy @@ -439,6 +441,40 @@ thrust::host_vector make_host_vector_sync( return make_host_vector_sync(device_span{c}, stream); } +/** + * @brief Asynchronously construct a pinned `cudf::detail::host_vector` of the given size + * + * @note This function may not synchronize `stream`. + * + * @tparam T The type of the vector data + * @param size The number of elements in the created vector + * @param stream The stream on which to allocate memory + * @return A host_vector of the given size + */ +template +host_vector make_pinned_vector_async(size_t size, rmm::cuda_stream_view stream) +{ + return host_vector(size, {cudf::get_pinned_memory_resource(), stream}); +} + +/** + * @brief Synchronously construct a pinned `cudf::detail::host_vector` of the given size + * + * @note This function synchronizes `stream`. + * + * @tparam T The type of the vector data + * @param size The number of elements in the created vector + * @param stream The stream on which to allocate memory + * @return A host_vector of the given size + */ +template +host_vector make_pinned_vector_sync(size_t size, rmm::cuda_stream_view stream) +{ + auto result = make_pinned_vector_async(size, stream); + stream.synchronize(); + return result; +} + } // namespace detail } // namespace cudf diff --git a/cpp/include/cudf/io/memory_resource.hpp b/cpp/include/cudf/io/memory_resource.hpp deleted file mode 100644 index a36e220ae7b..00000000000 --- a/cpp/include/cudf/io/memory_resource.hpp +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -#include - -namespace cudf::io { - -/** - * @brief Set the rmm resource to be used for host memory allocations by - * cudf::detail::hostdevice_vector - * - * hostdevice_vector is a utility class that uses a pair of host and device-side buffers for - * bouncing state between the cpu and the gpu. The resource set with this function (typically a - * pinned memory allocator) is what it uses to allocate space for it's host-side buffer. - * - * @param mr The rmm resource to be used for host-side allocations - * @return The previous resource that was in use - */ -rmm::host_async_resource_ref set_host_memory_resource(rmm::host_async_resource_ref mr); - -/** - * @brief Get the rmm resource being used for host memory allocations by - * cudf::detail::hostdevice_vector - * - * @return The rmm resource used for host-side allocations - */ -rmm::host_async_resource_ref get_host_memory_resource(); - -/** - * @brief Options to configure the default host memory resource - */ -struct host_mr_options { - std::optional pool_size; ///< The size of the pool to use for the default host memory - ///< resource. If not set, the default pool size is used. -}; - -/** - * @brief Configure the size of the default host memory resource. - * - * @throws cudf::logic_error if called after the default host memory resource has been created - * - * @param opts Options to configure the default host memory resource - * @return True if this call successfully configured the host memory resource, false if a - * a resource was already configured. - */ -bool config_default_host_memory_resource(host_mr_options const& opts); - -} // namespace cudf::io diff --git a/cpp/include/cudf/utilities/pinned_memory.hpp b/cpp/include/cudf/utilities/pinned_memory.hpp new file mode 100644 index 00000000000..b423eab6d38 --- /dev/null +++ b/cpp/include/cudf/utilities/pinned_memory.hpp @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +namespace cudf { + +/** + * @brief Set the rmm resource to be used for pinned memory allocations. + * + * @param mr The rmm resource to be used for pinned allocations + * @return The previous resource that was in use + */ +rmm::host_device_async_resource_ref set_pinned_memory_resource( + rmm::host_device_async_resource_ref mr); + +/** + * @brief Get the rmm resource being used for pinned memory allocations. + * + * @return The rmm resource used for pinned allocations + */ +rmm::host_device_async_resource_ref get_pinned_memory_resource(); + +/** + * @brief Options to configure the default pinned memory resource + */ +struct pinned_mr_options { + std::optional pool_size; ///< The size of the pool to use for the default pinned memory + ///< resource. If not set, the default pool size is used. +}; + +/** + * @brief Configure the size of the default pinned memory resource. + * + * @param opts Options to configure the default pinned memory resource + * @return True if this call successfully configured the pinned memory resource, false if a + * a resource was already configured. + */ +bool config_default_pinned_memory_resource(pinned_mr_options const& opts); + +} // namespace cudf diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index 5dee0c17a33..05faded651d 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -27,6 +27,7 @@ #include "io/utilities/parsing_utils.cuh" #include +#include #include #include #include diff --git a/cpp/src/io/orc/reader_impl_chunking.cu b/cpp/src/io/orc/reader_impl_chunking.cu index 5034aa14a95..43301826003 100644 --- a/cpp/src/io/orc/reader_impl_chunking.cu +++ b/cpp/src/io/orc/reader_impl_chunking.cu @@ -22,6 +22,7 @@ #include #include +#include #include #include diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 344e216cdc8..e9e031a407a 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -27,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -2339,7 +2338,7 @@ auto convert_table_to_orc_data(table_view const& input, std::move(streams), std::move(stripes), std::move(stripe_dicts.views), - cudf::detail::pinned_host_vector()}; + cudf::detail::make_pinned_vector_async(0, stream)}; } // Allocate intermediate output stream buffer @@ -2407,7 +2406,7 @@ auto convert_table_to_orc_data(table_view const& input, return max_stream_size; }(); - cudf::detail::pinned_host_vector bounce_buffer(max_out_stream_size); + auto bounce_buffer = cudf::detail::make_pinned_vector_async(max_out_stream_size, stream); auto intermediate_stats = gather_statistic_blobs(stats_freq, orc_table, segmentation, stream); diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index eb653c6b9ac..9de8a9e2719 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -23,6 +23,8 @@ #include "ipc/Message_generated.h" #include "ipc/Schema_generated.h" +#include + #include #include diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 1dfced94f5b..6d466748c17 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -36,7 +36,6 @@ #include #include #include -#include #include #include #include @@ -2278,7 +2277,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, } auto bounce_buffer = - cudf::detail::pinned_host_vector(all_device_write ? 0 : max_write_size); + cudf::detail::make_pinned_vector_async(all_device_write ? 0 : max_write_size, stream); return std::tuple{std::move(agg_meta), std::move(pages), diff --git a/cpp/src/io/text/bgzip_data_chunk_source.cu b/cpp/src/io/text/bgzip_data_chunk_source.cu index faa09e586ab..0e3ce779089 100644 --- a/cpp/src/io/text/bgzip_data_chunk_source.cu +++ b/cpp/src/io/text/bgzip_data_chunk_source.cu @@ -19,8 +19,9 @@ #include "io/utilities/config_utils.hpp" #include +#include #include -#include +#include #include #include #include @@ -66,7 +67,7 @@ struct bgzip_nvcomp_transform_functor { class bgzip_data_chunk_reader : public data_chunk_reader { private: template - static void copy_to_device(cudf::detail::pinned_host_vector const& host, + static void copy_to_device(cudf::detail::host_vector const& host, rmm::device_uvector& device, rmm::cuda_stream_view stream) { @@ -84,9 +85,9 @@ class bgzip_data_chunk_reader : public data_chunk_reader { 1 << 16; // 64k offset allocation, resized on demand cudaEvent_t event; - cudf::detail::pinned_host_vector h_compressed_blocks; - cudf::detail::pinned_host_vector h_compressed_offsets; - cudf::detail::pinned_host_vector h_decompressed_offsets; + cudf::detail::host_vector h_compressed_blocks; + cudf::detail::host_vector h_compressed_offsets; + cudf::detail::host_vector h_decompressed_offsets; rmm::device_uvector d_compressed_blocks; rmm::device_uvector d_decompressed_blocks; rmm::device_uvector d_compressed_offsets; @@ -103,7 +104,10 @@ class bgzip_data_chunk_reader : public data_chunk_reader { bool is_decompressed{}; decompression_blocks(rmm::cuda_stream_view init_stream) - : d_compressed_blocks(0, init_stream), + : h_compressed_blocks{cudf::detail::make_pinned_vector_async(0, init_stream)}, + h_compressed_offsets{cudf::detail::make_pinned_vector_async(0, init_stream)}, + h_decompressed_offsets{cudf::detail::make_pinned_vector_async(0, init_stream)}, + d_compressed_blocks(0, init_stream), d_decompressed_blocks(0, init_stream), d_compressed_offsets(0, init_stream), d_decompressed_offsets(0, init_stream), diff --git a/cpp/src/io/text/data_chunk_source_factories.cpp b/cpp/src/io/text/data_chunk_source_factories.cpp index 9d1d0498ace..596ca3458c8 100644 --- a/cpp/src/io/text/data_chunk_source_factories.cpp +++ b/cpp/src/io/text/data_chunk_source_factories.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,10 +14,12 @@ * limitations under the License. */ +#include "cudf/utilities/default_stream.hpp" #include "io/text/device_data_chunks.hpp" #include -#include +#include +#include #include #include @@ -31,8 +33,15 @@ namespace cudf::io::text { namespace { struct host_ticket { - cudaEvent_t event; - cudf::detail::pinned_host_vector buffer; + cudaEvent_t event{}; // tracks the completion of the last device-to-host copy. + cudf::detail::host_vector buffer; + + host_ticket() : buffer{cudf::detail::make_pinned_vector_sync(0, cudf::get_default_stream())} + { + cudaEventCreate(&event); + } + + ~host_ticket() { cudaEventDestroy(event); } }; /** @@ -43,20 +52,7 @@ class datasource_chunk_reader : public data_chunk_reader { constexpr static int num_tickets = 2; public: - datasource_chunk_reader(datasource* source) : _source(source) - { - // create an event to track the completion of the last device-to-host copy. - for (auto& ticket : _tickets) { - CUDF_CUDA_TRY(cudaEventCreate(&(ticket.event))); - } - } - - ~datasource_chunk_reader() override - { - for (auto& ticket : _tickets) { - CUDF_CUDA_TRY(cudaEventDestroy(ticket.event)); - } - } + datasource_chunk_reader(datasource* source) : _source(source) {} void skip_bytes(std::size_t size) override { @@ -84,7 +80,9 @@ class datasource_chunk_reader : public data_chunk_reader { CUDF_CUDA_TRY(cudaEventSynchronize(h_ticket.event)); // resize the host buffer as necessary to contain the requested number of bytes - if (h_ticket.buffer.size() < read_size) { h_ticket.buffer.resize(read_size); } + if (h_ticket.buffer.size() < read_size) { + h_ticket.buffer = cudf::detail::make_pinned_vector_sync(read_size, stream); + } _source->host_read(_offset, read_size, reinterpret_cast(h_ticket.buffer.data())); @@ -120,17 +118,6 @@ class istream_data_chunk_reader : public data_chunk_reader { istream_data_chunk_reader(std::unique_ptr datastream) : _datastream(std::move(datastream)) { - // create an event to track the completion of the last device-to-host copy. - for (auto& ticket : _tickets) { - CUDF_CUDA_TRY(cudaEventCreate(&(ticket.event))); - } - } - - ~istream_data_chunk_reader() override - { - for (auto& ticket : _tickets) { - CUDF_CUDA_TRY(cudaEventDestroy(ticket.event)); - } } void skip_bytes(std::size_t size) override { _datastream->ignore(size); }; @@ -148,7 +135,9 @@ class istream_data_chunk_reader : public data_chunk_reader { CUDF_CUDA_TRY(cudaEventSynchronize(h_ticket.event)); // resize the host buffer as necessary to contain the requested number of bytes - if (h_ticket.buffer.size() < read_size) { h_ticket.buffer.resize(read_size); } + if (h_ticket.buffer.size() < read_size) { + h_ticket.buffer = cudf::detail::make_pinned_vector_sync(read_size, stream); + } // read data from the host istream in to the pinned host memory buffer _datastream->read(h_ticket.buffer.data(), read_size); diff --git a/cpp/src/io/utilities/config_utils.cpp b/cpp/src/io/utilities/config_utils.cpp index dad1135e766..20ac89b4d53 100644 --- a/cpp/src/io/utilities/config_utils.cpp +++ b/cpp/src/io/utilities/config_utils.cpp @@ -16,22 +16,12 @@ #include "config_utils.hpp" -#include -#include #include -#include - -#include -#include -#include -#include #include #include -namespace cudf::io { - -namespace detail { +namespace cudf::io::detail { namespace cufile_integration { @@ -90,204 +80,4 @@ bool is_stable_enabled() { return is_all_enabled() or get_env_policy() == usage_ } // namespace nvcomp_integration -} // namespace detail - -namespace { -class fixed_pinned_pool_memory_resource { - using upstream_mr = rmm::mr::pinned_host_memory_resource; - using host_pooled_mr = rmm::mr::pool_memory_resource; - - private: - upstream_mr upstream_mr_{}; - size_t pool_size_{0}; - // Raw pointer to avoid a segfault when the pool is destroyed on exit - host_pooled_mr* pool_{nullptr}; - void* pool_begin_{nullptr}; - void* pool_end_{nullptr}; - cuda::stream_ref stream_{cudf::detail::global_cuda_stream_pool().get_stream().value()}; - - public: - fixed_pinned_pool_memory_resource(size_t size) - : pool_size_{size}, pool_{new host_pooled_mr(upstream_mr_, size, size)} - { - if (pool_size_ == 0) { return; } - - // Allocate full size from the pinned pool to figure out the beginning and end address - pool_begin_ = pool_->allocate_async(pool_size_, stream_); - pool_end_ = static_cast(static_cast(pool_begin_) + pool_size_); - pool_->deallocate_async(pool_begin_, pool_size_, stream_); - } - - void* do_allocate_async(std::size_t bytes, std::size_t alignment, cuda::stream_ref stream) - { - if (bytes <= pool_size_) { - try { - return pool_->allocate_async(bytes, alignment, stream); - } catch (...) { - // If the pool is exhausted, fall back to the upstream memory resource - } - } - - return upstream_mr_.allocate_async(bytes, alignment, stream); - } - - void do_deallocate_async(void* ptr, - std::size_t bytes, - std::size_t alignment, - cuda::stream_ref stream) noexcept - { - if (bytes <= pool_size_ && ptr >= pool_begin_ && ptr <= pool_end_) { - pool_->deallocate_async(ptr, bytes, alignment, stream); - } else { - upstream_mr_.deallocate_async(ptr, bytes, alignment, stream); - } - } - - void* allocate_async(std::size_t bytes, cuda::stream_ref stream) - { - return do_allocate_async(bytes, rmm::RMM_DEFAULT_HOST_ALIGNMENT, stream); - } - - void* allocate_async(std::size_t bytes, std::size_t alignment, cuda::stream_ref stream) - { - return do_allocate_async(bytes, alignment, stream); - } - - void* allocate(std::size_t bytes, std::size_t alignment = rmm::RMM_DEFAULT_HOST_ALIGNMENT) - { - auto const result = do_allocate_async(bytes, alignment, stream_); - stream_.wait(); - return result; - } - - void deallocate_async(void* ptr, std::size_t bytes, cuda::stream_ref stream) noexcept - { - return do_deallocate_async(ptr, bytes, rmm::RMM_DEFAULT_HOST_ALIGNMENT, stream); - } - - void deallocate_async(void* ptr, - std::size_t bytes, - std::size_t alignment, - cuda::stream_ref stream) noexcept - { - return do_deallocate_async(ptr, bytes, alignment, stream); - } - - void deallocate(void* ptr, - std::size_t bytes, - std::size_t alignment = rmm::RMM_DEFAULT_HOST_ALIGNMENT) noexcept - { - deallocate_async(ptr, bytes, alignment, stream_); - stream_.wait(); - } - - bool operator==(fixed_pinned_pool_memory_resource const& other) const - { - return pool_ == other.pool_ and stream_ == other.stream_; - } - - bool operator!=(fixed_pinned_pool_memory_resource const& other) const - { - return !operator==(other); - } - - [[maybe_unused]] friend void get_property(fixed_pinned_pool_memory_resource const&, - cuda::mr::device_accessible) noexcept - { - } - - [[maybe_unused]] friend void get_property(fixed_pinned_pool_memory_resource const&, - cuda::mr::host_accessible) noexcept - { - } -}; - -static_assert(cuda::mr::resource_with, - ""); - -} // namespace - -CUDF_EXPORT rmm::host_async_resource_ref& make_default_pinned_mr(std::optional config_size) -{ - static fixed_pinned_pool_memory_resource mr = [config_size]() { - auto const size = [&config_size]() -> size_t { - if (auto const env_val = getenv("LIBCUDF_PINNED_POOL_SIZE"); env_val != nullptr) { - return std::atol(env_val); - } - - if (config_size.has_value()) { return *config_size; } - - size_t free{}, total{}; - CUDF_CUDA_TRY(cudaMemGetInfo(&free, &total)); - // 0.5% of the total device memory, capped at 100MB - return std::min(total / 200, size_t{100} * 1024 * 1024); - }(); - - // rmm requires the pool size to be a multiple of 256 bytes - auto const aligned_size = (size + 255) & ~255; - CUDF_LOG_INFO("Pinned pool size = {}", aligned_size); - - // make the pool with max size equal to the initial size - return fixed_pinned_pool_memory_resource{aligned_size}; - }(); - - static rmm::host_async_resource_ref mr_ref{mr}; - return mr_ref; -} - -CUDF_EXPORT std::mutex& host_mr_mutex() -{ - static std::mutex map_lock; - return map_lock; -} - -// Must be called with the host_mr_mutex mutex held -CUDF_EXPORT rmm::host_async_resource_ref& make_host_mr(std::optional const& opts, - bool* did_configure = nullptr) -{ - static rmm::host_async_resource_ref* mr_ref = nullptr; - bool configured = false; - if (mr_ref == nullptr) { - configured = true; - mr_ref = &make_default_pinned_mr(opts ? opts->pool_size : std::nullopt); - } - - // If the user passed an out param to detect whether this call configured a resource - // set the result - if (did_configure != nullptr) { *did_configure = configured; } - - return *mr_ref; -} - -// Must be called with the host_mr_mutex mutex held -CUDF_EXPORT rmm::host_async_resource_ref& host_mr() -{ - static rmm::host_async_resource_ref mr_ref = make_host_mr(std::nullopt); - return mr_ref; -} - -rmm::host_async_resource_ref set_host_memory_resource(rmm::host_async_resource_ref mr) -{ - std::scoped_lock lock{host_mr_mutex()}; - auto last_mr = host_mr(); - host_mr() = mr; - return last_mr; -} - -rmm::host_async_resource_ref get_host_memory_resource() -{ - std::scoped_lock lock{host_mr_mutex()}; - return host_mr(); -} - -bool config_default_host_memory_resource(host_mr_options const& opts) -{ - std::scoped_lock lock{host_mr_mutex()}; - auto did_configure = false; - make_host_mr(opts, &did_configure); - return did_configure; -} - -} // namespace cudf::io +} // namespace cudf::io::detail diff --git a/cpp/src/io/utilities/hostdevice_vector.hpp b/cpp/src/io/utilities/hostdevice_vector.hpp index 0883ac3609f..1ae27a2f4ae 100644 --- a/cpp/src/io/utilities/hostdevice_vector.hpp +++ b/cpp/src/io/utilities/hostdevice_vector.hpp @@ -16,11 +16,10 @@ #pragma once -#include "config_utils.hpp" #include "hostdevice_span.hpp" -#include -#include +#include +#include #include #include #include @@ -53,7 +52,7 @@ class hostdevice_vector { } explicit hostdevice_vector(size_t initial_size, size_t max_size, rmm::cuda_stream_view stream) - : h_data({cudf::io::get_host_memory_resource(), stream}), d_data(max_size, stream) + : h_data{make_pinned_vector_async(0, stream)}, d_data(max_size, stream) { CUDF_EXPECTS(initial_size <= max_size, "initial_size cannot be larger than max_size"); @@ -173,7 +172,7 @@ class hostdevice_vector { } private: - cudf::detail::rmm_host_vector h_data; + cudf::detail::host_vector h_data; rmm::device_uvector d_data; }; diff --git a/cpp/src/utilities/pinned_memory.cpp b/cpp/src/utilities/pinned_memory.cpp new file mode 100644 index 00000000000..5d2e3ac332a --- /dev/null +++ b/cpp/src/utilities/pinned_memory.cpp @@ -0,0 +1,216 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace cudf { + +namespace { +class fixed_pinned_pool_memory_resource { + using upstream_mr = rmm::mr::pinned_host_memory_resource; + using host_pooled_mr = rmm::mr::pool_memory_resource; + + private: + upstream_mr upstream_mr_{}; + size_t pool_size_{0}; + // Raw pointer to avoid a segfault when the pool is destroyed on exit + host_pooled_mr* pool_{nullptr}; + void* pool_begin_{nullptr}; + void* pool_end_{nullptr}; + cuda::stream_ref stream_{cudf::detail::global_cuda_stream_pool().get_stream().value()}; + + public: + fixed_pinned_pool_memory_resource(size_t size) + : pool_size_{size}, pool_{new host_pooled_mr(upstream_mr_, size, size)} + { + if (pool_size_ == 0) { return; } + + // Allocate full size from the pinned pool to figure out the beginning and end address + pool_begin_ = pool_->allocate_async(pool_size_, stream_); + pool_end_ = static_cast(static_cast(pool_begin_) + pool_size_); + pool_->deallocate_async(pool_begin_, pool_size_, stream_); + } + + void* allocate_async(std::size_t bytes, std::size_t alignment, cuda::stream_ref stream) + { + if (bytes <= pool_size_) { + try { + return pool_->allocate_async(bytes, alignment, stream); + } catch (...) { + // If the pool is exhausted, fall back to the upstream memory resource + } + } + + return upstream_mr_.allocate_async(bytes, alignment, stream); + } + + void* allocate_async(std::size_t bytes, cuda::stream_ref stream) + { + return allocate_async(bytes, rmm::RMM_DEFAULT_HOST_ALIGNMENT, stream); + } + + void* allocate(std::size_t bytes, std::size_t alignment = rmm::RMM_DEFAULT_HOST_ALIGNMENT) + { + auto const result = allocate_async(bytes, alignment, stream_); + stream_.wait(); + return result; + } + + void deallocate_async(void* ptr, + std::size_t bytes, + std::size_t alignment, + cuda::stream_ref stream) noexcept + { + if (bytes <= pool_size_ && ptr >= pool_begin_ && ptr < pool_end_) { + pool_->deallocate_async(ptr, bytes, alignment, stream); + } else { + upstream_mr_.deallocate_async(ptr, bytes, alignment, stream); + } + } + + void deallocate_async(void* ptr, std::size_t bytes, cuda::stream_ref stream) noexcept + { + return deallocate_async(ptr, bytes, rmm::RMM_DEFAULT_HOST_ALIGNMENT, stream); + } + + void deallocate(void* ptr, + std::size_t bytes, + std::size_t alignment = rmm::RMM_DEFAULT_HOST_ALIGNMENT) noexcept + { + deallocate_async(ptr, bytes, alignment, stream_); + stream_.wait(); + } + + bool operator==(fixed_pinned_pool_memory_resource const& other) const + { + return pool_ == other.pool_ and stream_ == other.stream_; + } + + bool operator!=(fixed_pinned_pool_memory_resource const& other) const + { + return !operator==(other); + } + + friend void get_property(fixed_pinned_pool_memory_resource const&, + cuda::mr::device_accessible) noexcept + { + } + + friend void get_property(fixed_pinned_pool_memory_resource const&, + cuda::mr::host_accessible) noexcept + { + } +}; + +static_assert(cuda::mr::resource_with, + "Pinned pool mr must be accessible from both host and device"); + +CUDF_EXPORT rmm::host_device_async_resource_ref& make_default_pinned_mr( + std::optional config_size) +{ + static fixed_pinned_pool_memory_resource mr = [config_size]() { + auto const size = [&config_size]() -> size_t { + if (auto const env_val = getenv("LIBCUDF_PINNED_POOL_SIZE"); env_val != nullptr) { + return std::atol(env_val); + } + + if (config_size.has_value()) { return *config_size; } + + auto const total = rmm::available_device_memory().second; + // 0.5% of the total device memory, capped at 100MB + return std::min(total / 200, size_t{100} * 1024 * 1024); + }(); + + // rmm requires the pool size to be a multiple of 256 bytes + auto const aligned_size = rmm::align_up(size, rmm::RMM_DEFAULT_HOST_ALIGNMENT); + CUDF_LOG_INFO("Pinned pool size = {}", aligned_size); + + // make the pool with max size equal to the initial size + return fixed_pinned_pool_memory_resource{aligned_size}; + }(); + + static rmm::host_device_async_resource_ref mr_ref{mr}; + return mr_ref; +} + +CUDF_EXPORT std::mutex& host_mr_mutex() +{ + static std::mutex map_lock; + return map_lock; +} + +// Must be called with the host_mr_mutex mutex held +CUDF_EXPORT rmm::host_device_async_resource_ref& make_host_mr( + std::optional const& opts, bool* did_configure = nullptr) +{ + static rmm::host_device_async_resource_ref* mr_ref = nullptr; + bool configured = false; + if (mr_ref == nullptr) { + configured = true; + mr_ref = &make_default_pinned_mr(opts ? opts->pool_size : std::nullopt); + } + + // If the user passed an out param to detect whether this call configured a resource + // set the result + if (did_configure != nullptr) { *did_configure = configured; } + + return *mr_ref; +} + +// Must be called with the host_mr_mutex mutex held +CUDF_EXPORT rmm::host_device_async_resource_ref& host_mr() +{ + static rmm::host_device_async_resource_ref mr_ref = make_host_mr(std::nullopt); + return mr_ref; +} + +} // namespace + +rmm::host_device_async_resource_ref set_pinned_memory_resource( + rmm::host_device_async_resource_ref mr) +{ + std::scoped_lock lock{host_mr_mutex()}; + auto last_mr = host_mr(); + host_mr() = mr; + return last_mr; +} + +rmm::host_device_async_resource_ref get_pinned_memory_resource() +{ + std::scoped_lock lock{host_mr_mutex()}; + return host_mr(); +} + +bool config_default_pinned_memory_resource(pinned_mr_options const& opts) +{ + std::scoped_lock lock{host_mr_mutex()}; + auto did_configure = false; + make_host_mr(opts, &did_configure); + return did_configure; +} + +} // namespace cudf diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 826f879ddc0..f6d762cc2ec 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -380,15 +380,16 @@ ConfigureTest( # * utilities tests ------------------------------------------------------------------------------- ConfigureTest( UTILITIES_TEST - utilities_tests/type_list_tests.cpp utilities_tests/column_debug_tests.cpp utilities_tests/column_utilities_tests.cpp utilities_tests/column_wrapper_tests.cpp + utilities_tests/default_stream_tests.cpp utilities_tests/io_utilities_tests.cpp utilities_tests/lists_column_wrapper_tests.cpp utilities_tests/logger_tests.cpp - utilities_tests/default_stream_tests.cpp + utilities_tests/pinned_memory_tests.cpp utilities_tests/type_check_tests.cpp + utilities_tests/type_list_tests.cpp ) # ################################################################################################## diff --git a/cpp/tests/io/json_test.cpp b/cpp/tests/io/json_test.cpp index 57aa2721756..4c01a1fb87b 100644 --- a/cpp/tests/io/json_test.cpp +++ b/cpp/tests/io/json_test.cpp @@ -28,13 +28,13 @@ #include #include #include -#include #include #include #include #include #include #include +#include #include @@ -2068,7 +2068,7 @@ TEST_F(JsonReaderTest, JSONLinesRecoveringSync) size_t{128} * 1024 * 1024}; // Set new resource - auto last_mr = cudf::io::set_host_memory_resource(mr); + auto last_mr = cudf::set_pinned_memory_resource(mr); /** * @brief Spark has the specific need to ignore extra characters that come after the first record @@ -2158,7 +2158,7 @@ TEST_F(JsonReaderTest, JSONLinesRecoveringSync) float64_wrapper{c_data.cbegin(), c_data.cend(), c_validity.cbegin()}); // Restore original memory source - cudf::io::set_host_memory_resource(last_mr); + cudf::set_pinned_memory_resource(last_mr); } TEST_F(JsonReaderTest, MixedTypes) diff --git a/cpp/tests/utilities_tests/io_utilities_tests.cpp b/cpp/tests/utilities_tests/io_utilities_tests.cpp index e5a153bf781..9ed8f18f5cc 100644 --- a/cpp/tests/utilities_tests/io_utilities_tests.cpp +++ b/cpp/tests/utilities_tests/io_utilities_tests.cpp @@ -16,14 +16,6 @@ #include #include -#include - -#include -#include - -#include -#include -#include #include @@ -32,43 +24,6 @@ using cudf::io::detail::base64_encode; class IoUtilitiesTest : public cudf::test::BaseFixture {}; -TEST(IoUtilitiesTest, HostMemoryGetAndSet) -{ - // Global environment for temporary files - auto const temp_env = static_cast( - ::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment)); - - // pinned/pooled host memory resource - using host_pooled_mr = rmm::mr::pool_memory_resource; - host_pooled_mr mr(std::make_shared().get(), - size_t{128} * 1024 * 1024); - - // set new resource - auto last_mr = cudf::io::get_host_memory_resource(); - cudf::io::set_host_memory_resource(mr); - - constexpr int num_rows = 32 * 1024; - auto valids = - cudf::detail::make_counting_transform_iterator(0, [&](int index) { return index % 2; }); - auto values = thrust::make_counting_iterator(0); - - cudf::test::fixed_width_column_wrapper col(values, values + num_rows, valids); - - cudf::table_view expected({col}); - auto filepath = temp_env->get_temp_filepath("IoUtilsMemTest.parquet"); - cudf::io::parquet_writer_options out_args = - cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected); - cudf::io::write_parquet(out_args); - - cudf::io::parquet_reader_options const read_opts = - cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); - auto const result = cudf::io::read_parquet(read_opts); - CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, expected); - - // reset memory resource back - cudf::io::set_host_memory_resource(last_mr); -} - TEST(IoUtilitiesTest, Base64EncodeAndDecode) { // a vector of lorem ipsum strings diff --git a/cpp/tests/utilities_tests/pinned_memory_tests.cpp b/cpp/tests/utilities_tests/pinned_memory_tests.cpp new file mode 100644 index 00000000000..df9103640f4 --- /dev/null +++ b/cpp/tests/utilities_tests/pinned_memory_tests.cpp @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include + +#include +#include +#include + +class PinnedMemoryTest : public cudf::test::BaseFixture {}; + +TEST(PinnedMemoryTest, MemoryResourceGetAndSet) +{ + // Global environment for temporary files + auto const temp_env = static_cast( + ::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment)); + + // pinned/pooled host memory resource + using host_pooled_mr = rmm::mr::pool_memory_resource; + host_pooled_mr mr(std::make_shared().get(), + 4 * 1024 * 1024); + + // set new resource + auto last_mr = cudf::get_pinned_memory_resource(); + cudf::set_pinned_memory_resource(mr); + + constexpr int num_rows = 32 * 1024; + auto valids = + cudf::detail::make_counting_transform_iterator(0, [&](int index) { return index % 2; }); + auto values = thrust::make_counting_iterator(0); + + cudf::test::fixed_width_column_wrapper col(values, values + num_rows, valids); + + cudf::table_view expected({col}); + auto filepath = temp_env->get_temp_filepath("MemoryResourceGetAndSetTest.parquet"); + cudf::io::parquet_writer_options out_args = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected); + cudf::io::write_parquet(out_args); + + cudf::io::parquet_reader_options const read_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto const result = cudf::io::read_parquet(read_opts); + CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, expected); + + // reset memory resource back + cudf::set_pinned_memory_resource(last_mr); +} diff --git a/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java b/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java index 83b801db7fb..df0d9dc7c3e 100644 --- a/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java +++ b/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java @@ -128,9 +128,9 @@ public static synchronized void initialize(long poolSize, int gpuId) { * * @param poolSize size of the pool to initialize. * @param gpuId gpu id to set to get memory pool from, -1 means to use default - * @param setCuioHostMemoryResource true if this pinned pool should be used by cuIO for host memory + * @param setCudfPinnedPoolMemoryResource true if this pinned pool should be used by cuDF for pinned memory */ - public static synchronized void initialize(long poolSize, int gpuId, boolean setCuioHostMemoryResource) { + public static synchronized void initialize(long poolSize, int gpuId, boolean setCudfPinnedPoolMemoryResource) { if (isInitialized()) { throw new IllegalStateException("Can only initialize the pool once."); } @@ -139,7 +139,7 @@ public static synchronized void initialize(long poolSize, int gpuId, boolean set t.setDaemon(true); return t; }); - initFuture = initService.submit(() -> new PinnedMemoryPool(poolSize, gpuId, setCuioHostMemoryResource)); + initFuture = initService.submit(() -> new PinnedMemoryPool(poolSize, gpuId, setCudfPinnedPoolMemoryResource)); initService.shutdown(); } @@ -216,15 +216,15 @@ public static long getTotalPoolSizeBytes() { return 0; } - private PinnedMemoryPool(long poolSize, int gpuId, boolean setCuioHostMemoryResource) { + private PinnedMemoryPool(long poolSize, int gpuId, boolean setCudfPinnedPoolMemoryResource) { if (gpuId > -1) { // set the gpu device to use Cuda.setDevice(gpuId); Cuda.freeZero(); } this.poolHandle = Rmm.newPinnedPoolMemoryResource(poolSize, poolSize); - if (setCuioHostMemoryResource) { - Rmm.setCuioPinnedPoolMemoryResource(this.poolHandle); + if (setCudfPinnedPoolMemoryResource) { + Rmm.setCudfPinnedPoolMemoryResource(this.poolHandle); } this.poolSize = poolSize; } diff --git a/java/src/main/java/ai/rapids/cudf/Rmm.java b/java/src/main/java/ai/rapids/cudf/Rmm.java index 4dee1b7aa24..ed029c918e4 100755 --- a/java/src/main/java/ai/rapids/cudf/Rmm.java +++ b/java/src/main/java/ai/rapids/cudf/Rmm.java @@ -597,7 +597,7 @@ static native long newEventHandlerResourceAdaptor(long handle, long trackerHandl public static native long newPinnedPoolMemoryResource(long initSize, long maxSize); - public static native long setCuioPinnedPoolMemoryResource(long poolPtr); + public static native long setCudfPinnedPoolMemoryResource(long poolPtr); public static native void releasePinnedPoolMemoryResource(long poolPtr); diff --git a/java/src/main/native/src/RmmJni.cpp b/java/src/main/native/src/RmmJni.cpp index fa78f6ca4e2..8bd0f7793b4 100644 --- a/java/src/main/native/src/RmmJni.cpp +++ b/java/src/main/native/src/RmmJni.cpp @@ -16,7 +16,7 @@ #include "cudf_jni_apis.hpp" -#include +#include #include #include @@ -395,15 +395,17 @@ class java_debug_event_handler_memory_resource final : public java_event_handler } }; -inline auto& prior_cuio_host_mr() +inline auto& prior_cudf_pinned_mr() { - static rmm::host_async_resource_ref _prior_cuio_host_mr = cudf::io::get_host_memory_resource(); - return _prior_cuio_host_mr; + static rmm::host_device_async_resource_ref _prior_cudf_pinned_mr = + cudf::get_pinned_memory_resource(); + return _prior_cudf_pinned_mr; } /** * This is a pinned fallback memory resource that will try to allocate `pool` - * and if that fails, attempt to allocate from the prior resource used by cuIO `prior_cuio_host_mr`. + * and if that fails, attempt to allocate from the prior resource used by cuDF + * `prior_cudf_pinned_mr`. * * We detect whether a pointer to free is inside of the pool by checking its address (see * constructor) @@ -433,7 +435,7 @@ class pinned_fallback_host_memory_resource { /** * @brief Allocates pinned host memory of size at least \p bytes bytes from either the - * _pool argument provided, or prior_cuio_host_mr. + * _pool argument provided, or prior_cudf_pinned_mr. * * @throws rmm::bad_alloc if the requested allocation could not be fulfilled due to any other * reason. @@ -450,7 +452,7 @@ class pinned_fallback_host_memory_resource { return _pool->allocate(bytes, alignment); } catch (const std::exception& unused) { // try to allocate using the underlying pinned resource - return prior_cuio_host_mr().allocate(bytes, alignment); + return prior_cudf_pinned_mr().allocate(bytes, alignment); } // we should not reached here return nullptr; @@ -459,7 +461,7 @@ class pinned_fallback_host_memory_resource { /** * @brief Deallocate memory pointed to by \p ptr of size \p bytes bytes. We attempt * to deallocate from _pool, if ptr is detected to be in the pool address range, - * otherwise we deallocate from `prior_cuio_host_mr`. + * otherwise we deallocate from `prior_cudf_pinned_mr`. * * @param ptr Pointer to be deallocated. * @param bytes Size of the allocation. @@ -472,7 +474,7 @@ class pinned_fallback_host_memory_resource { if (ptr >= pool_begin_ && ptr <= pool_end_) { _pool->deallocate(ptr, bytes, alignment); } else { - prior_cuio_host_mr().deallocate(ptr, bytes, alignment); + prior_cudf_pinned_mr().deallocate(ptr, bytes, alignment); } } @@ -1025,7 +1027,7 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Rmm_newPinnedPoolMemoryResource(JNIE CATCH_STD(env, 0) } -JNIEXPORT void JNICALL Java_ai_rapids_cudf_Rmm_setCuioPinnedPoolMemoryResource(JNIEnv* env, +JNIEXPORT void JNICALL Java_ai_rapids_cudf_Rmm_setCudfPinnedPoolMemoryResource(JNIEnv* env, jclass clazz, jlong pool_ptr) { @@ -1035,7 +1037,7 @@ JNIEXPORT void JNICALL Java_ai_rapids_cudf_Rmm_setCuioPinnedPoolMemoryResource(J // create a pinned fallback pool that will allocate pinned memory // if the regular pinned pool is exhausted pinned_fallback_mr.reset(new pinned_fallback_host_memory_resource(pool)); - prior_cuio_host_mr() = cudf::io::set_host_memory_resource(*pinned_fallback_mr); + prior_cudf_pinned_mr() = cudf::set_pinned_memory_resource(*pinned_fallback_mr); } CATCH_STD(env, ) } @@ -1047,8 +1049,8 @@ JNIEXPORT void JNICALL Java_ai_rapids_cudf_Rmm_releasePinnedPoolMemoryResource(J try { cudf::jni::auto_set_device(env); // set the cuio host memory resource to what it was before, or the same - // if we didn't overwrite it with setCuioPinnedPoolMemoryResource - cudf::io::set_host_memory_resource(prior_cuio_host_mr()); + // if we didn't overwrite it with setCudfPinnedPoolMemoryResource + cudf::set_pinned_memory_resource(prior_cudf_pinned_mr()); pinned_fallback_mr.reset(); delete reinterpret_cast(pool_ptr); } @@ -1088,7 +1090,7 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Rmm_allocFromFallbackPinnedPool(JNIE jlong size) { cudf::jni::auto_set_device(env); - void* ret = cudf::io::get_host_memory_resource().allocate(size); + void* ret = cudf::get_pinned_memory_resource().allocate(size); return reinterpret_cast(ret); } @@ -1101,7 +1103,7 @@ JNIEXPORT void JNICALL Java_ai_rapids_cudf_Rmm_freeFromFallbackPinnedPool(JNIEnv try { cudf::jni::auto_set_device(env); void* cptr = reinterpret_cast(ptr); - cudf::io::get_host_memory_resource().deallocate(cptr, size); + cudf::get_pinned_memory_resource().deallocate(cptr, size); } CATCH_STD(env, ) } @@ -1112,7 +1114,7 @@ JNIEXPORT jboolean JNICALL Java_ai_rapids_cudf_Rmm_configureDefaultCudfPinnedPoo { try { cudf::jni::auto_set_device(env); - return cudf::io::config_default_host_memory_resource(cudf::io::host_mr_options{size}); + return cudf::config_default_pinned_memory_resource(cudf::pinned_mr_options{size}); } CATCH_STD(env, false) }