diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 9584fafb32..498f1d6f5f 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -36,6 +36,7 @@ class AllocRetain { // The size of each allocation in `_free_allocs` std::size_t _size{defaults::bounce_buffer_size()}; + public: /** * @brief An host memory allocation */ @@ -56,6 +57,7 @@ class AllocRetain { Alloc& operator=(Alloc&& o) = delete; ~Alloc() noexcept { _manager->put(_alloc, _size); } void* get() noexcept { return _alloc; } + void* get(std::ptrdiff_t offset) noexcept { return static_cast(_alloc) + offset; } std::size_t size() noexcept { return _size; } }; @@ -67,6 +69,7 @@ class AllocRetain { // ~AllocRetain() noexcept = default; + private: /** * @brief Free all retained allocations * diff --git a/cpp/include/kvikio/remote_handle.hpp b/cpp/include/kvikio/remote_handle.hpp index 809500f663..bff96ce0ad 100644 --- a/cpp/include/kvikio/remote_handle.hpp +++ b/cpp/include/kvikio/remote_handle.hpp @@ -15,8 +15,10 @@ */ #pragma once +#include #include #include +#include #include #include #include @@ -34,6 +36,98 @@ namespace kvikio { namespace detail { +/** + * @brief Bounce buffer in pinned host memory. + * + * @note Is not thread-safe. + */ +class BounceBufferH2D { + CUstream _stream; // The CUDA steam to use. + CUdeviceptr _dev; // The output device buffer. + AllocRetain::Alloc _host_buffer; // The host buffer to bounce data on. + std::ptrdiff_t _dev_offset{0}; // Number of bytes written to `_dev`. + std::ptrdiff_t _host_offset{0}; // Number of bytes written to `_host` (resets on flush). + + public: + /** + * @brief Create a bounce buffer for an output device buffer. + * + * @param stream The CUDA stream used throughout the lifetime of the bounce buffer. + * @param device_buffer The output device buffer (final destination of the data). + */ + BounceBufferH2D(CUstream stream, void* device_buffer) + : _stream{stream}, + _dev{convert_void2deviceptr(device_buffer)}, + _host_buffer{AllocRetain::instance().get()} + { + } + + /** + * @brief The bounce buffer if flushed to device on destruction. + */ + ~BounceBufferH2D() noexcept + { + try { + flush(); + } catch (CUfileException const& e) { + std::cerr << "BounceBufferH2D error on final flush: "; + std::cerr << e.what(); + std::cerr << std::endl; + } + } + + private: + /** + * @brief Write host memory to the output device buffer. + * + * @param src The host memory source. + * @param size Number of bytes to write. + */ + void write_to_device(void const* src, std::size_t size) + { + if (size > 0) { + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(_dev + _dev_offset, src, size, _stream)); + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(_stream)); + _dev_offset += size; + } + } + + /** + * @brief Flush the bounce buffer by writing everything to the output device buffer. + */ + void flush() + { + write_to_device(_host_buffer.get(), _host_offset); + _host_offset = 0; + } + + public: + /** + * @brief Write host memory to the bounce buffer (also host memory). + * + * Only when the bounce buffer has been filled up is data copied to the output device buffer. + * + * @param data The host memory source. + * @param size Number of bytes to write. + */ + void write(char const* data, std::size_t size) + { + if (_host_buffer.size() - _host_offset < size) { // Not enough space left in the bounce buffer + flush(); + assert(_host_offset == 0); + } + if (_host_buffer.size() < size) { + // If still not enough space, we just copy the data to the device. This only happens when + // `defaults::bounce_buffer_size()` is smaller than 16kb thus no need to performance + // optimize for this case. + write_to_device(data, size); + } else if (size > 0) { + std::memcpy(_host_buffer.get(_host_offset), data, size); + _host_offset += size; + } + } +}; + /** * @brief Context used by the "CURLOPT_WRITEFUNCTION" callbacks. */ @@ -46,6 +140,7 @@ struct CallbackContext { : buf{static_cast(buf)}, size{size}, offset{0}, overflow_error{0} { } + BounceBufferH2D* bounce_buffer{nullptr}; // Only used by callback_device_memory }; /** @@ -98,12 +193,7 @@ inline std::size_t callback_device_memory(char* data, } KVIKIO_NVTX_FUNC_RANGE("RemoteHandle - callback_device_memory()", nbytes); - CUstream stream = detail::StreamsByThread::get(); - CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( - convert_void2deviceptr(ctx->buf + ctx->offset), data, nbytes, stream)); - // We have to sync since curl might overwrite or free `data`. - CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); - + ctx->bounce_buffer->write(data, nbytes); ctx->offset += nbytes; return nbytes; } @@ -414,6 +504,10 @@ class RemoteHandle { /** * @brief Read from remote source into buffer (host or device memory). * + * When reading into device memory, a bounce buffer is used to avoid many small memory + * copies to device. Use `kvikio::default::bounce_buffer_size_reset()` to set the size + * of this bounce buffer (default 16 MiB). + * * @param buf Pointer to host or device memory. * @param size Number of bytes to read. * @param file_offset File offset in bytes. @@ -450,6 +544,10 @@ class RemoteHandle { curl.perform(); } else { PushAndPopContext c(get_context_from_pointer(buf)); + // We use a bounce buffer to avoid many small memory copies to device. Libcurl has a + // maximum chunk size of 16kb (`CURL_MAX_WRITE_SIZE`) but chunks are often much smaller. + detail::BounceBufferH2D bounce_buffer(detail::StreamsByThread::get(), buf); + ctx.bounce_buffer = &bounce_buffer; curl.perform(); } } catch (std::runtime_error const& e) {