Skip to content

Commit

Permalink
Adding KvikIO as backend for file_sink
Browse files Browse the repository at this point in the history
  • Loading branch information
madsbk committed Apr 6, 2022
1 parent 405b1fc commit ab8aeb0
Showing 1 changed file with 34 additions and 11 deletions.
45 changes: 34 additions & 11 deletions cpp/src/io/utilities/data_sink.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,9 @@
#include "file_io_utilities.hpp"
#include <cudf/io/data_sink.hpp>
#include <cudf/utilities/error.hpp>
#include <io/utilities/config_utils.hpp>

#include <kvikio/file_handle.hpp>
#include <rmm/cuda_stream_view.hpp>

namespace cudf {
Expand All @@ -30,10 +32,17 @@ namespace io {
class file_sink : public data_sink {
public:
explicit file_sink(std::string const& filepath)
: _cufile_out(detail::make_cufile_output(filepath))
{
_output_stream.open(filepath, std::ios::out | std::ios::binary | std::ios::trunc);
CUDF_EXPECTS(_output_stream.is_open(), "Cannot open output file");

if (detail::cufile_integration::is_kvikio_enabled()) {
_kvikio_file = kvikio::FileHandle(filepath, "w");
_use_kvikio = true;
} else {
_cufile_out =
std::unique_ptr<detail::cufile_output_impl>(detail::make_cufile_output(filepath));
}
}

virtual ~file_sink() { flush(); }
Expand All @@ -49,36 +58,50 @@ class file_sink : public data_sink {

size_t bytes_written() override { return _bytes_written; }

[[nodiscard]] bool supports_device_write() const override { return _cufile_out != nullptr; }
[[nodiscard]] bool supports_device_write() const override
{
return _use_kvikio || _cufile_out != nullptr;
}

[[nodiscard]] bool is_device_write_preferred(size_t size) const override
{
return _cufile_out != nullptr && _cufile_out->is_cufile_io_preferred(size);
return _use_kvikio || (_cufile_out != nullptr && _cufile_out->is_cufile_io_preferred(size));
}

void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override
std::future<void> device_write_async(void const* gpu_data,
size_t size,
rmm::cuda_stream_view stream) override
{
if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file.");

_cufile_out->write(gpu_data, _bytes_written, size);
size_t offset = _bytes_written;
_bytes_written += size;

if (_use_kvikio) {
// KvikIO's `pwrite()` returns a `std::future<size_t>` so we convert it
// to `std::future<void>`
return std::async(std::launch::deferred, [this, gpu_data, size, offset] {
_kvikio_file.pwrite(gpu_data, size, offset).get();
});
}
return _cufile_out->write_async(gpu_data, offset, size);
}

std::future<void> device_write_async(void const* gpu_data,
size_t size,
rmm::cuda_stream_view stream) override
void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override
{
if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file.");

auto result = _cufile_out->write_async(gpu_data, _bytes_written, size);
if (_use_kvikio) { return device_write_async(gpu_data, _bytes_written, stream).get(); }
_cufile_out->write(gpu_data, _bytes_written, size);
_bytes_written += size;
return result;
}

private:
std::ofstream _output_stream;
size_t _bytes_written = 0;
std::unique_ptr<detail::cufile_output_impl> _cufile_out;
kvikio::FileHandle _kvikio_file;
bool _use_kvikio{false};
};

/**
Expand Down

0 comments on commit ab8aeb0

Please sign in to comment.