Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-threaded writing to GDS writes #9372

Merged
merged 8 commits into from
Oct 8, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions cpp/include/cudf/io/data_sink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <rmm/cuda_stream_view.hpp>

#include <future>
#include <memory>
#include <string>
#include <vector>
Expand Down Expand Up @@ -134,6 +135,34 @@ class data_sink {
CUDF_FAIL("data_sink classes that support device_write must override it.");
}

/**
* @brief Asynchronously append the buffer content to the sink from a gpu address
*
* For optimal performance, should only be called when `is_device_write_preferred` returns `true`.
* Data sink implementations that don't support direct device writes don't need to override
* this function.
*
* `gpu_data` must not be freed until this call is synchronized.
* @code{.pseudo}
* auto result = device_write_async(gpu_data, size, stream);
* result.wait(); // OR result.get()
* @endcode
*
* @throws cudf::logic_error the object does not support direct device writes, i.e.
* `supports_device_write` returns `false`.
* @throws cudf::logic_error
*
* @param gpu_data Pointer to the buffer to be written into the sink object
* @param size Number of bytes to write
* @param stream CUDA stream to use
*/
virtual std::future<void> device_write_async(void const* gpu_data,
size_t size,
rmm::cuda_stream_view stream)
{
CUDF_FAIL("data_sink classes that support device_write must override it.");
devavret marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* @brief Flush the data written into the sink
*/
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,7 @@ void writer::impl::write(table_view const& table)
(stats_granularity_ == statistics_freq::STATISTICS_PAGE) ? page_stats.data() : nullptr,
(stats_granularity_ != statistics_freq::STATISTICS_NONE) ? page_stats.data() + num_pages
: nullptr);
std::vector<std::future<void>> write_tasks;
for (; r < rnext; r++, global_r++) {
for (auto i = 0; i < num_columns; i++) {
gpu::EncColumnChunk* ck = &chunks[r][i];
Expand All @@ -1392,7 +1393,8 @@ void writer::impl::write(table_view const& table)

if (out_sink_->is_device_write_preferred(ck->compressed_size)) {
// let the writer do what it wants to retrieve the data from the gpu.
out_sink_->device_write(dev_bfr + ck->ck_stat_size, ck->compressed_size, stream);
write_tasks.push_back(
out_sink_->device_write_async(dev_bfr + ck->ck_stat_size, ck->compressed_size, stream));
// we still need to do a (much smaller) memcpy for the statistics.
if (ck->ck_stat_size != 0) {
md.row_groups[global_r].columns[i].meta_data.statistics_blob.resize(ck->ck_stat_size);
Expand Down Expand Up @@ -1438,6 +1440,9 @@ void writer::impl::write(table_view const& table)
current_chunk_offset += ck->compressed_size;
}
}
for (auto& task : write_tasks) {
devavret marked this conversation as resolved.
Show resolved Hide resolved
task.wait();
}
}
}

Expand Down
30 changes: 28 additions & 2 deletions cpp/src/io/utilities/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ class file_sink : public data_sink {
_bytes_written += size;
}

std::future<void> device_write_async(void const* gpu_data,
size_t size,
rmm::cuda_stream_view stream) override
{
if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file.");

auto result = _cufile_out->write_async(gpu_data, _bytes_written, size);
_bytes_written += size;
return result;
bdice marked this conversation as resolved.
Show resolved Hide resolved
}

private:
std::ofstream _output_stream;
size_t _bytes_written = 0;
Expand Down Expand Up @@ -111,6 +122,14 @@ class void_sink : public data_sink {
_bytes_written += size;
}

std::future<void> device_write_async(void const* gpu_data,
size_t size,
rmm::cuda_stream_view stream) override
{
_bytes_written += size;
return std::async(std::launch::deferred, [] {});
}

void flush() override {}

size_t bytes_written() override { return _bytes_written; }
Expand All @@ -130,10 +149,17 @@ class user_sink_wrapper : public data_sink {
bool supports_device_write() const override { return user_sink->supports_device_write(); }

void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override
{
device_write_async(gpu_data, size, stream).get();
}

std::future<void> device_write_async(void const* gpu_data,
size_t size,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(user_sink->supports_device_write(),
"device_write() being called on a data_sink that doesn't support it");
user_sink->device_write(gpu_data, size, stream);
"device write being called on a data_sink that doesn't support it");
devavret marked this conversation as resolved.
Show resolved Hide resolved
return user_sink->device_write_async(gpu_data, size, stream);
}

void flush() override { user_sink->flush(); }
Expand Down
44 changes: 41 additions & 3 deletions cpp/src/io/utilities/file_io_utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,52 @@ size_t cufile_input_impl::read(size_t offset,
}

cufile_output_impl::cufile_output_impl(std::string const& filepath)
: shim{cufile_shim::instance()}, cf_file(shim, filepath, O_CREAT | O_RDWR | O_DIRECT, 0664)
: shim{cufile_shim::instance()},
cf_file(shim, filepath, O_CREAT | O_RDWR | O_DIRECT, 0664),
pool(16)
{
}

void cufile_output_impl::write(void const* data, size_t offset, size_t size)
{
CUDF_EXPECTS(shim->write(cf_file.handle(), data, size, offset, 0) != -1,
"cuFile error writing to a file");
write_async(data, offset, size).wait();
}

std::future<void> cufile_output_impl::write_async(void const* data, size_t offset, size_t size)
{
int device;
cudaGetDevice(&device);

auto write_slice = [=](void const* src, size_t size, size_t offset) -> void {
bdice marked this conversation as resolved.
Show resolved Hide resolved
cudaSetDevice(device);
auto write_size = shim->write(cf_file.handle(), src, size, offset, 0);
CUDF_EXPECTS(write_size != -1 and write_size == static_cast<decltype(write_size)>(size),
"cuFile error writing to a file");
};

auto source = static_cast<uint8_t const*>(data);
std::vector<std::future<bool>> slice_tasks;
constexpr size_t max_slice_bytes = 4 * 1024 * 1024;
size_t n_slices = util::div_rounding_up_safe(size, max_slice_bytes);
devavret marked this conversation as resolved.
Show resolved Hide resolved
size_t slice_size = max_slice_bytes;
size_t slice_offset = 0;
for (size_t t = 0; t < n_slices; ++t) {
void const* src_slice = source + slice_offset;

if (t == n_slices - 1) { slice_size = size % max_slice_bytes; }
slice_tasks.push_back(pool.submit(write_slice, src_slice, slice_size, offset + slice_offset));

slice_offset += slice_size;
}
devavret marked this conversation as resolved.
Show resolved Hide resolved
auto waiter = [](decltype(slice_tasks) slice_tasks) -> void {
devavret marked this conversation as resolved.
Show resolved Hide resolved
for (auto& task : slice_tasks) {
devavret marked this conversation as resolved.
Show resolved Hide resolved
task.wait();
}
};
// The future returned from this function is deferred, not async becasue we want to avoid creating
devavret marked this conversation as resolved.
Show resolved Hide resolved
// threads for each write_async call. This overhead is significant in case of multiple small
// writes.
return std::async(std::launch::deferred, waiter, std::move(slice_tasks));
}
#endif

Expand Down
28 changes: 28 additions & 0 deletions cpp/src/io/utilities/file_io_utilities.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,20 @@ class cufile_output : public cufile_io_base {
* @param size Number of bytes to write
*/
virtual void write(void const* data, size_t offset, size_t size) = 0;

/**
* @brief Asynchronously writes the data from a device buffer into a file.
*
* It is the caller's responsibility to not invalidate `data` until the result from this function
* is synchronized.
*
* @throws cudf::logic_error on cuFile error
*
* @param data Pointer to the buffer to be written into the output file
* @param offset Number of bytes from the start
* @param size Number of bytes to write
*/
virtual std::future<void> write_async(void const* data, size_t offset, size_t size) = 0;
};

#ifdef CUFILE_FOUND
Expand Down Expand Up @@ -242,10 +256,12 @@ class cufile_output_impl final : public cufile_output {
cufile_output_impl(std::string const& filepath);

void write(void const* data, size_t offset, size_t size) override;
std::future<void> write_async(void const* data, size_t offset, size_t size) override;

private:
cufile_shim const* shim = nullptr;
cufile_registered_file const cf_file;
cudf::detail::thread_pool pool;
};
#else

Expand All @@ -262,6 +278,14 @@ class cufile_input_impl final : public cufile_input {
{
CUDF_FAIL("Only used to compile without cufile library, should not be called");
}

std::future<size_t> read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
CUDF_FAIL("Only used to compile without cufile library, should not be called");
}
};

class cufile_output_impl final : public cufile_output {
Expand All @@ -270,6 +294,10 @@ class cufile_output_impl final : public cufile_output {
{
CUDF_FAIL("Only used to compile without cufile library, should not be called");
}
std::future<void> write_async(void const* data, size_t offset, size_t size) override
devavret marked this conversation as resolved.
Show resolved Hide resolved
{
CUDF_FAIL("Only used to compile without cufile library, should not be called");
}
};
#endif

Expand Down
16 changes: 16 additions & 0 deletions cpp/tests/io/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1048,13 +1048,21 @@ class custom_test_data_sink : public cudf::io::data_sink {
bool supports_device_write() const override { return true; }

void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override
{
this->device_write_async(gpu_data, size, stream).get();
}

std::future<void> device_write_async(void const* gpu_data,
size_t size,
rmm::cuda_stream_view stream) override
{
char* ptr = nullptr;
CUDA_TRY(cudaMallocHost(&ptr, size));
CUDA_TRY(cudaMemcpyAsync(ptr, gpu_data, size, cudaMemcpyDeviceToHost, stream.value()));
stream.synchronize();
outfile_.write(ptr, size);
CUDA_TRY(cudaFreeHost(ptr));
return std::async(std::launch::deferred, [] {});
devavret marked this conversation as resolved.
Show resolved Hide resolved
}

void flush() override { outfile_.flush(); }
Expand Down Expand Up @@ -2011,13 +2019,21 @@ class custom_test_memmap_sink : public cudf::io::data_sink {
bool supports_device_write() const override { return supports_device_writes; }

void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override
{
this->device_write_async(gpu_data, size, stream).get();
}

std::future<void> device_write_async(void const* gpu_data,
size_t size,
rmm::cuda_stream_view stream) override
{
char* ptr = nullptr;
CUDA_TRY(cudaMallocHost(&ptr, size));
CUDA_TRY(cudaMemcpyAsync(ptr, gpu_data, size, cudaMemcpyDeviceToHost, stream.value()));
stream.synchronize();
mm_writer->host_write(ptr, size);
CUDA_TRY(cudaFreeHost(ptr));
return std::async(std::launch::deferred, [] {});
}

void flush() override { mm_writer->flush(); }
Expand Down