Skip to content

Commit

Permalink
data_sink: use KvikIO
Browse files Browse the repository at this point in the history
  • Loading branch information
madsbk committed Mar 23, 2022
1 parent 061c875 commit 5cc9294
Showing 1 changed file with 17 additions and 19 deletions.
36 changes: 17 additions & 19 deletions cpp/src/io/utilities/data_sink.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,10 +16,10 @@

#include <fstream>

#include "file_io_utilities.hpp"
#include <cudf/io/data_sink.hpp>
#include <cudf/utilities/error.hpp>

#include <kvikio/file_handle.hpp>
#include <rmm/cuda_stream_view.hpp>

namespace cudf {
Expand All @@ -29,8 +29,7 @@ namespace io {
*/
class file_sink : public data_sink {
public:
explicit file_sink(std::string const& filepath)
: _cufile_out(detail::make_cufile_output(filepath))
explicit file_sink(std::string const& filepath) : _file(filepath, "w")
{
_output_stream.open(filepath, std::ios::out | std::ios::binary | std::ios::trunc);
CUDF_EXPECTS(_output_stream.is_open(), "Cannot open output file");
Expand All @@ -49,36 +48,35 @@ class file_sink : public data_sink {

size_t bytes_written() override { return _bytes_written; }

[[nodiscard]] bool supports_device_write() const override { return _cufile_out != nullptr; }
[[nodiscard]] bool supports_device_write() const override { return true; }

[[nodiscard]] bool is_device_write_preferred(size_t size) const override
{
return _cufile_out != nullptr && _cufile_out->is_cufile_io_preferred(size);
}

void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override
{
if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file.");

_cufile_out->write(gpu_data, _bytes_written, size);
_bytes_written += size;
return size > (128 << 10);
}

std::future<void> device_write_async(void const* gpu_data,
size_t size,
rmm::cuda_stream_view stream) override
{
if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file.");

auto result = _cufile_out->write_async(gpu_data, _bytes_written, size);
// KvikIO's `pwrite()` returns a `std::future<size_t>` so we convert it
// to `std::future<void>`
size_t offset = _bytes_written;
_bytes_written += size;
return result;
return std::async(std::launch::deferred, [this, gpu_data, size, offset] {
_file.pwrite(gpu_data, size, offset).get();
});
}

void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override
{
return device_write_async(gpu_data, size, stream).get();
}

private:
std::ofstream _output_stream;
size_t _bytes_written = 0;
std::unique_ptr<detail::cufile_output_impl> _cufile_out;
kvikio::FileHandle _file;
};

/**
Expand Down

0 comments on commit 5cc9294

Please sign in to comment.