Skip to content

Commit

Permalink
Add multi-thread reading to GDS reads (#8752)
Browse files Browse the repository at this point in the history
Adds a thread pool and new `device_read_async` API to datasource.
This API has been added to parquet and ORC readers.
`device_read` also now uses `device_read_async` with synchronization so using `device_read` with large enough size should also be faster now. As a result, Avro reader should also get the benefits of multi-threaded reading.

Authors:
  - Devavret Makkar (https://github.com/devavret)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Ram (Ramakrishna Prabhu) ((https://github.com/rgsl888prabhu)

URL: #8752
  • Loading branch information
devavret authored Jul 23, 2021
1 parent fc95992 commit a69a8a4
Show file tree
Hide file tree
Showing 8 changed files with 560 additions and 29 deletions.
29 changes: 29 additions & 0 deletions cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <arrow/result.h>
#include <arrow/status.h>

#include <future>
#include <memory>

namespace cudf {
Expand Down Expand Up @@ -209,6 +210,34 @@ class datasource {
CUDF_FAIL("datasource classes that support device_read must override it.");
}

/**
* @brief Asynchronously reads a selected range into a preallocated device buffer
*
* Returns a future value that contains the number of bytes read. Calling `get()` method of the
* return value synchronizes this function.
*
* For optimal performance, should only be called when `is_device_read_preferred` returns `true`.
* Data source implementations that don't support direct device reads don't need to override this
* function.
*
* @throws cudf::logic_error when the object does not support direct device reads, i.e.
* `supports_device_read` returns `false`.
*
* @param offset Number of bytes from the start
* @param size Number of bytes to read
* @param dst Address of the existing device memory
* @param stream CUDA stream to use
*
* @return The number of bytes read as a future value (can be smaller than size)
*/
virtual std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream)
{
CUDF_FAIL("datasource classes that support device_read_async must override it.");
}

/**
* @brief Returns the size of the data in the source.
*
Expand Down
13 changes: 9 additions & 4 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
size_t num_rowgroups = 0;
int stripe_idx = 0;

std::vector<std::pair<std::future<size_t>, size_t>> read_tasks;
for (auto const& stripe_source_mapping : selected_stripes) {
// Iterate through the source files selected stripes
for (auto const& stripe : stripe_source_mapping.stripe_info) {
Expand Down Expand Up @@ -1170,10 +1171,11 @@ table_with_metadata reader::impl::read(size_type skip_rows,
}
if (_metadata->per_file_metadata[stripe_source_mapping.source_idx]
.source->is_device_read_preferred(len)) {
CUDF_EXPECTS(
_metadata->per_file_metadata[stripe_source_mapping.source_idx].source->device_read(
offset, len, d_dst, stream) == len,
"Unexpected discrepancy in bytes read.");
read_tasks.push_back(
std::make_pair(_metadata->per_file_metadata[stripe_source_mapping.source_idx]
.source->device_read_async(offset, len, d_dst, stream),
len));

} else {
const auto buffer =
_metadata->per_file_metadata[stripe_source_mapping.source_idx].source->host_read(
Expand Down Expand Up @@ -1246,6 +1248,9 @@ table_with_metadata reader::impl::read(size_type skip_rows,
stripe_idx++;
}
}
for (auto& task : read_tasks) {
CUDF_EXPECTS(task.first.get() == task.second, "Unexpected discrepancy in bytes read.");
}

// Process dataset chunk pages into output columns
if (stripe_data.size() != 0) {
Expand Down
33 changes: 24 additions & 9 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ void generate_depth_remappings(std::map<int, std::pair<std::vector<int>, std::ve
/**
* @copydoc cudf::io::detail::parquet::read_column_chunks
*/
void reader::impl::read_column_chunks(
std::future<void> reader::impl::read_column_chunks(
std::vector<std::unique_ptr<datasource::buffer>>& page_data,
hostdevice_vector<gpu::ColumnChunkDesc>& chunks, // TODO const?
size_t begin_chunk,
Expand All @@ -833,6 +833,7 @@ void reader::impl::read_column_chunks(
rmm::cuda_stream_view stream)
{
// Transfer chunk data, coalescing adjacent chunks
std::vector<std::future<size_t>> read_tasks;
for (size_t chunk = begin_chunk; chunk < end_chunk;) {
const size_t io_offset = column_chunk_offsets[chunk];
size_t io_size = chunks[chunk].compressed_size;
Expand All @@ -854,7 +855,11 @@ void reader::impl::read_column_chunks(
if (io_size != 0) {
auto& source = _sources[chunk_source_map[chunk]];
if (source->is_device_read_preferred(io_size)) {
page_data[chunk] = source->device_read(io_offset, io_size, stream);
auto buffer = rmm::device_buffer(io_size, stream);
auto fut_read_size = source->device_read_async(
io_offset, io_size, static_cast<uint8_t*>(buffer.data()), stream);
read_tasks.emplace_back(std::move(fut_read_size));
page_data[chunk] = datasource::buffer::create(std::move(buffer));
} else {
auto const buffer = source->host_read(io_offset, io_size);
page_data[chunk] =
Expand All @@ -869,6 +874,12 @@ void reader::impl::read_column_chunks(
chunk = next_chunk;
}
}
auto sync_fn = [](decltype(read_tasks) read_tasks) {
for (auto& task : read_tasks) {
task.wait();
}
};
return std::async(std::launch::deferred, sync_fn, std::move(read_tasks));
}

/**
Expand Down Expand Up @@ -1435,6 +1446,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
// Initialize column chunk information
size_t total_decompressed_size = 0;
auto remaining_rows = num_rows;
std::vector<std::future<void>> read_rowgroup_tasks;
for (const auto& rg : selected_row_groups) {
const auto& row_group = _metadata->get_row_group(rg.index, rg.source_index);
auto const row_group_start = rg.start_row;
Expand Down Expand Up @@ -1502,16 +1514,19 @@ table_with_metadata reader::impl::read(size_type skip_rows,
}
}
// Read compressed chunk data to device memory
read_column_chunks(page_data,
chunks,
io_chunk_idx,
chunks.size(),
column_chunk_offsets,
chunk_source_map,
stream);
read_rowgroup_tasks.push_back(read_column_chunks(page_data,
chunks,
io_chunk_idx,
chunks.size(),
column_chunk_offsets,
chunk_source_map,
stream));

remaining_rows -= row_group.num_rows;
}
for (auto& task : read_rowgroup_tasks) {
task.wait();
}
assert(remaining_rows <= 0);

// Process dataset chunk pages into output columns
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ class reader::impl {
* @param stream CUDA stream used for device memory operations and kernel launches.
*
*/
void read_column_chunks(std::vector<std::unique_ptr<datasource::buffer>>& page_data,
hostdevice_vector<gpu::ColumnChunkDesc>& chunks,
size_t begin_chunk,
size_t end_chunk,
const std::vector<size_t>& column_chunk_offsets,
std::vector<size_type> const& chunk_source_map,
rmm::cuda_stream_view stream);
std::future<void> read_column_chunks(std::vector<std::unique_ptr<datasource::buffer>>& page_data,
hostdevice_vector<gpu::ColumnChunkDesc>& chunks,
size_t begin_chunk,
size_t end_chunk,
const std::vector<size_t>& column_chunk_offsets,
std::vector<size_type> const& chunk_source_map,
rmm::cuda_stream_view stream);

/**
* @brief Returns the number of total pages from the given column chunks
Expand Down
13 changes: 12 additions & 1 deletion cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class file_source : public datasource {

bool supports_device_read() const override { return _cufile_in != nullptr; }

bool is_device_read_preferred(size_t size) const
bool is_device_read_preferred(size_t size) const override
{
return _cufile_in != nullptr && _cufile_in->is_cufile_io_preferred(size);
}
Expand All @@ -67,6 +67,17 @@ class file_source : public datasource {
return _cufile_in->read(offset, read_size, dst, stream);
}

std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file.");

auto const read_size = std::min(size, _file.size() - offset);
return _cufile_in->read_async(offset, read_size, dst, stream);
}

size_t size() const override { return _file.size(); }

protected:
Expand Down
56 changes: 48 additions & 8 deletions cpp/src/io/utilities/file_io_utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
* limitations under the License.
*/
#include "file_io_utilities.hpp"
#include <cudf/detail/utilities/integer_utils.hpp>

#include <rmm/device_buffer.hpp>

#include <dlfcn.h>

#include <fstream>
#include <numeric>

namespace cudf {
namespace io {
Expand Down Expand Up @@ -166,30 +168,68 @@ void cufile_registered_file::register_handle()
cufile_registered_file::~cufile_registered_file() { shim->handle_deregister(cf_handle); }

cufile_input_impl::cufile_input_impl(std::string const& filepath)
: shim{cufile_shim::instance()}, cf_file(shim, filepath, O_RDONLY | O_DIRECT)
: shim{cufile_shim::instance()},
cf_file(shim, filepath, O_RDONLY | O_DIRECT),
pool(16) // The benefit from multithreaded read plateaus around 16 threads
{
pool.sleep_duration = 10;
}

std::unique_ptr<datasource::buffer> cufile_input_impl::read(size_t offset,
size_t size,
rmm::cuda_stream_view stream)
{
rmm::device_buffer out_data(size, stream);
CUDF_EXPECTS(shim->read(cf_file.handle(), out_data.data(), size, offset, 0) != -1,
"cuFile error reading from a file");

auto read_size = read(offset, size, reinterpret_cast<uint8_t*>(out_data.data()), stream);
out_data.resize(read_size, stream);
return datasource::buffer::create(std::move(out_data));
}

std::future<size_t> cufile_input_impl::read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream)
{
int device;
cudaGetDevice(&device);

auto read_slice = [=](void* dst, size_t size, size_t offset) -> ssize_t {
cudaSetDevice(device);
auto read_size = shim->read(cf_file.handle(), dst, size, offset, 0);
CUDF_EXPECTS(read_size != -1, "cuFile error reading from a file");
return read_size;
};

std::vector<std::future<ssize_t>> slice_tasks;
constexpr size_t max_slice_bytes = 4 * 1024 * 1024;
size_t n_slices = util::div_rounding_up_safe(size, max_slice_bytes);
size_t slice_size = max_slice_bytes;
size_t slice_offset = 0;
for (size_t t = 0; t < n_slices; ++t) {
void* dst_slice = dst + slice_offset;

if (t == n_slices - 1) { slice_size = size % max_slice_bytes; }
slice_tasks.push_back(pool.submit(read_slice, dst_slice, slice_size, offset + slice_offset));

slice_offset += slice_size;
}
auto waiter = [](decltype(slice_tasks) slice_tasks) -> size_t {
return std::accumulate(slice_tasks.begin(), slice_tasks.end(), 0, [](auto sum, auto& task) {
return sum + task.get();
});
};
// The future returned from this function is deferred, not async becasue we want to avoid creating
// threads for each read_async call. This overhead is significant in case of multiple small reads.
return std::async(std::launch::deferred, waiter, std::move(slice_tasks));
}

size_t cufile_input_impl::read(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(shim->read(cf_file.handle(), dst, size, offset, 0) != -1,
"cuFile error reading from a file");
// always read the requested size for now
return size;
auto result = read_async(offset, size, dst, stream);
return result.get();
}

cufile_output_impl::cufile_output_impl(std::string const& filepath)
Expand Down
25 changes: 25 additions & 0 deletions cpp/src/io/utilities/file_io_utilities.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#pragma once

#ifdef CUFILE_FOUND
#include "thread_pool.hpp"

#include <cufile.h>
#include <cudf_test/file_utilities.hpp>
#endif
Expand Down Expand Up @@ -106,6 +108,23 @@ class cufile_input : public cufile_io_base {
* @return The number of bytes read
*/
virtual size_t read(size_t offset, size_t size, uint8_t* dst, rmm::cuda_stream_view stream) = 0;

/**
* @brief Asynchronously reads into existing device memory.
*
* @throws cudf::logic_error on cuFile error
*
* @param offset Number of bytes from the start
* @param size Number of bytes to read
* @param dst Address of the existing device memory
* @param stream CUDA stream to use
*
* @return The number of bytes read as an std::future
*/
virtual std::future<size_t> read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) = 0;
};

/**
Expand Down Expand Up @@ -202,9 +221,15 @@ class cufile_input_impl final : public cufile_input {

size_t read(size_t offset, size_t size, uint8_t* dst, rmm::cuda_stream_view stream) override;

std::future<size_t> read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override;

private:
cufile_shim const* shim = nullptr;
cufile_registered_file const cf_file;
cudf::detail::thread_pool pool;
};

/**
Expand Down
Loading

0 comments on commit a69a8a4

Please sign in to comment.