Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor pinned memory vector and ORC+Parquet writers #13206

Merged
merged 19 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conda/recipes/libcudf/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ outputs:
- test -f $PREFIX/include/cudf/detail/utilities/integer_utils.hpp
- test -f $PREFIX/include/cudf/detail/utilities/linked_column.hpp
- test -f $PREFIX/include/cudf/detail/utilities/logger.hpp
- test -f $PREFIX/include/cudf/detail/utilities/pinned_allocator.hpp
- test -f $PREFIX/include/cudf/detail/utilities/pinned_host_vector.hpp
- test -f $PREFIX/include/cudf/detail/utilities/vector_factories.hpp
- test -f $PREFIX/include/cudf/detail/utilities/visitor_overload.hpp
- test -f $PREFIX/include/cudf/dictionary/detail/concatenate.hpp
Expand Down
5 changes: 2 additions & 3 deletions cpp/benchmarks/io/text/multibyte_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <cudf_test/file_utilities.hpp>

#include <cudf/column/column_factories.hpp>
#include <cudf/detail/utilities/pinned_allocator.hpp>
#include <cudf/detail/utilities/pinned_host_vector.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/io/text/data_chunk_source_factories.hpp>
#include <cudf/io/text/detail/bgzip_utils.hpp>
Expand All @@ -33,7 +33,6 @@
#include <cudf/types.hpp>
#include <cudf/utilities/default_stream.hpp>

#include <thrust/host_vector.h>
#include <thrust/transform.h>

#include <nvbench/nvbench.cuh>
Expand Down Expand Up @@ -136,7 +135,7 @@ static void bench_multibyte_split(nvbench::state& state,
std::unique_ptr<cudf::io::datasource> datasource;
auto device_input = create_random_input(file_size_approx, delim_factor, 0.05, delim);
auto host_input = std::vector<char>{};
auto host_pinned_input = thrust::host_vector<char, cudf::detail::pinned_allocator<char>>{};
auto host_pinned_input = cudf::detail::pinned_host_vector<char>{};

if (source_type != data_chunk_source_type::device &&
source_type != data_chunk_source_type::host_pinned) {
Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, pinned memory is always on the host side thus not sure this renaming is really needed.

Copy link
Contributor Author

@ttnghia ttnghia Apr 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the name *_host_vector is better expressing its purpose, similar to having thurst::host_vector instead of just thrust::vector.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the name as host_vector in pinned memory, so the name looks good.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2008-2022 NVIDIA Corporation
* Copyright 2008-2023 NVIDIA Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,8 @@

#include <cudf/utilities/error.hpp>

#include <thrust/host_vector.h>

namespace cudf::detail {

/*! \p pinned_allocator is a CUDA-specific host memory allocator
Expand Down Expand Up @@ -199,4 +201,11 @@ class pinned_allocator {
return !operator==(x);
}
};

/**
* @brief A vector class with pinned host memory allocator
*/
template <typename T>
using pinned_host_vector = thrust::host_vector<T, pinned_allocator<T>>;

} // namespace cudf::detail
116 changes: 46 additions & 70 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/pinned_host_vector.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/utilities/bit.hpp>
Expand Down Expand Up @@ -79,11 +80,6 @@ struct row_group_index_info {
};

namespace {
/**
* @brief Helper for pinned host memory
*/
template <typename T>
using pinned_buffer = std::unique_ptr<T, decltype(&cudaFreeHost)>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good, with one minor concern. For places that already used host_vector, there's no change. But, for this use case, we are introducing initialization into the memory that does not need to be initialized (and previously wasn't). If you don't mind, please run ORC or Parquet writer benchmarks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't see where is the initialization that you mentioned? The new pinned_host_vector uses allocator that also doesn't initialize the internal buffer:

__host__ inline pointer allocate(size_type cnt, const_pointer /*hint*/ = 0)
  {
    if (cnt > this->max_size()) { throw std::bad_alloc(); }  // end if

    pointer result(0);
    CUDF_CUDA_TRY(cudaMallocHost(reinterpret_cast<void**>(&result), cnt * sizeof(value_type)));
    return result;
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see benchmarks here: #13206 (comment)

Copy link
Contributor

@vuule vuule Apr 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that host_vector has its own initialization outside of the allocator (same as std::vector). Either way, it does not seem to impact the overall performance.


/**
* @brief Translates ORC compression to nvCOMP compression
Expand Down Expand Up @@ -379,11 +375,11 @@ __global__ void copy_string_data(char* string_pool,
} // namespace

void persisted_statistics::persist(int num_table_rows,
bool single_write_mode,
SingleWriteMode single_write_mode,
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
intermediate_statistics& intermediate_stats,
rmm::cuda_stream_view stream)
{
if (not single_write_mode) {
if (single_write_mode == SingleWriteMode::YES) {
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
// persist the strings in the chunks into a string pool and update pointers
auto const num_chunks = static_cast<int>(intermediate_stats.stripe_stat_chunks.size());
// min offset and max offset + 1 for total size
Expand Down Expand Up @@ -670,7 +666,7 @@ orc_streams create_streams(host_span<orc_column_view> columns,
std::map<uint32_t, size_t> const& decimal_column_sizes,
bool enable_dictionary,
CompressionKind compression_kind,
bool single_write_mode)
SingleWriteMode single_write_mode)
{
// 'column 0' row index stream
std::vector<Stream> streams{{ROW_INDEX, 0}}; // TODO: Separate index and data streams?
Expand All @@ -685,7 +681,7 @@ orc_streams create_streams(host_span<orc_column_view> columns,

for (auto& column : columns) {
auto const is_nullable = [&]() -> bool {
if (single_write_mode) {
if (single_write_mode == SingleWriteMode::YES) {
return column.nullable();
} else {
// For chunked write, when not provided nullability, we assume the worst case scenario
Expand Down Expand Up @@ -2196,28 +2192,17 @@ std::unique_ptr<table_input_metadata> make_table_meta(table_view const& input)
* @param stream CUDA stream used for device memory operations and kernel launches
* @return A tuple of the intermediate results containing the processed data
*/
std::tuple<orc_streams,
hostdevice_vector<compression_result>,
hostdevice_2dvector<gpu::StripeStream>,
encoded_data,
file_segmentation,
hostdevice_2dvector<gpu::StripeDictionary>,
std::vector<StripeInformation>,
orc_table_view,
rmm::device_buffer,
intermediate_statistics,
pinned_buffer<uint8_t>>
convert_table_to_orc_data(table_view const& input,
table_input_metadata const& table_meta,
stripe_size_limits max_stripe_size,
size_type row_index_stride,
bool enable_dictionary,
CompressionKind compression_kind,
size_t compression_blocksize,
statistics_freq stats_freq,
bool single_write_mode,
data_sink const& out_sink,
rmm::cuda_stream_view stream)
auto convert_table_to_orc_data(table_view const& input,
table_input_metadata const& table_meta,
stripe_size_limits max_stripe_size,
size_type row_index_stride,
bool enable_dictionary,
CompressionKind compression_kind,
size_t compression_blocksize,
statistics_freq stats_freq,
SingleWriteMode single_write_mode,
data_sink const& out_sink,
rmm::cuda_stream_view stream)
{
auto const input_tview = table_device_view::create(input, stream);

Expand Down Expand Up @@ -2288,17 +2273,17 @@ convert_table_to_orc_data(table_view const& input,
auto stripes = gather_stripes(num_index_streams, segmentation, &enc_data, &strm_descs, stream);

if (num_rows == 0) {
return {std::move(streams),
hostdevice_vector<compression_result>{}, // comp_results
std::move(strm_descs),
std::move(enc_data),
std::move(segmentation),
std::move(stripe_dict),
std::move(stripes),
std::move(orc_table),
rmm::device_buffer{}, // compressed_data
intermediate_statistics{stream},
pinned_buffer<uint8_t>{nullptr, cudaFreeHost}};
return std::tuple{std::move(streams),
hostdevice_vector<compression_result>{}, // comp_results
std::move(strm_descs),
std::move(enc_data),
std::move(segmentation),
std::move(stripe_dict),
std::move(stripes),
std::move(orc_table),
rmm::device_buffer{}, // compressed_data
intermediate_statistics{stream},
cudf::detail::pinned_host_vector<uint8_t>()};
}

// Allocate intermediate output stream buffer
Expand All @@ -2312,7 +2297,7 @@ convert_table_to_orc_data(table_view const& input,
auto const padded_block_header_size =
util::round_up_unsafe<size_t>(block_header_size, compressed_block_align);

auto stream_output = [&]() {
auto bounce_buffer = [&]() {
size_t max_stream_size = 0;
bool all_device_write = true;

Expand All @@ -2333,16 +2318,7 @@ convert_table_to_orc_data(table_view const& input,
max_stream_size = std::max(max_stream_size, stream_size);
}

if (all_device_write) {
return pinned_buffer<uint8_t>{nullptr, cudaFreeHost};
} else {
return pinned_buffer<uint8_t>{[](size_t size) {
uint8_t* ptr = nullptr;
CUDF_CUDA_TRY(cudaMallocHost(&ptr, size));
return ptr;
}(max_stream_size),
cudaFreeHost};
}
return cudf::detail::pinned_host_vector<uint8_t>(all_device_write ? 0 : max_stream_size);
}();

// Compress the data streams
Expand Down Expand Up @@ -2374,17 +2350,17 @@ convert_table_to_orc_data(table_view const& input,

auto intermediate_stats = gather_statistic_blobs(stats_freq, orc_table, segmentation, stream);

return {std::move(streams),
std::move(comp_results),
std::move(strm_descs),
std::move(enc_data),
std::move(segmentation),
std::move(stripe_dict),
std::move(stripes),
std::move(orc_table),
std::move(compressed_data),
std::move(intermediate_stats),
std::move(stream_output)};
return std::tuple{std::move(streams),
std::move(comp_results),
std::move(strm_descs),
std::move(enc_data),
std::move(segmentation),
std::move(stripe_dict),
std::move(stripes),
std::move(orc_table),
std::move(compressed_data),
std::move(intermediate_stats),
std::move(bounce_buffer)};
}

} // namespace
Expand All @@ -2399,7 +2375,7 @@ writer::impl::impl(std::unique_ptr<data_sink> sink,
_compression_kind(to_orc_compression(options.get_compression())),
_compression_blocksize(compression_block_size(_compression_kind)),
_stats_freq(options.get_statistics_freq()),
_single_write_mode(mode == SingleWriteMode::YES),
_single_write_mode(mode),
_kv_meta(options.get_key_value_metadata()),
_out_sink(std::move(sink))
{
Expand All @@ -2419,7 +2395,7 @@ writer::impl::impl(std::unique_ptr<data_sink> sink,
_compression_kind(to_orc_compression(options.get_compression())),
_compression_blocksize(compression_block_size(_compression_kind)),
_stats_freq(options.get_statistics_freq()),
_single_write_mode(mode == SingleWriteMode::YES),
_single_write_mode(mode),
_kv_meta(options.get_key_value_metadata()),
_out_sink(std::move(sink))
{
Expand Down Expand Up @@ -2458,7 +2434,7 @@ void writer::impl::write(table_view const& input)
orc_table,
compressed_data,
intermediate_stats,
stream_output] = [&] {
bounce_buffer] = [&] {
try {
return convert_table_to_orc_data(input,
*_table_meta,
Expand Down Expand Up @@ -2489,7 +2465,7 @@ void writer::impl::write(table_view const& input)
orc_table,
compressed_data,
intermediate_stats,
stream_output.get());
bounce_buffer);

// Update data into the footer. This needs to be called even when num_rows==0.
add_table_to_footer_data(orc_table, stripes);
Expand All @@ -2504,7 +2480,7 @@ void writer::impl::write_orc_data_to_sink(orc_streams& streams,
orc_table_view const& orc_table,
rmm::device_buffer const& compressed_data,
intermediate_statistics& intermediate_stats,
uint8_t* stream_output)
host_span<uint8_t> bounce_buffer)
{
if (orc_table.num_rows() == 0) { return; }

Expand Down Expand Up @@ -2544,7 +2520,7 @@ void writer::impl::write_orc_data_to_sink(orc_streams& streams,
strm_desc,
enc_data.streams[strm_desc.column_id][segmentation.stripes[stripe_id].first],
static_cast<uint8_t const*>(compressed_data.data()),
stream_output,
bounce_buffer.data(),
&stripe,
&streams,
_compression_kind,
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/io/orc/writer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ struct persisted_statistics {
}

void persist(int num_table_rows,
bool single_write_mode,
SingleWriteMode single_write_mode,
intermediate_statistics& intermediate_stats,
rmm::cuda_stream_view stream);

Expand Down Expand Up @@ -302,7 +302,7 @@ class writer::impl {
* @param orc_table Non-owning view of a cuDF table that includes ORC-related information
* @param compressed_data Compressed stream data
* @param intermediate_stats Statistics data stored between calls to write
* @param stream_output Temporary host output buffer
* @param bounce_buffer Temporary host output buffer
*/
void write_orc_data_to_sink(orc_streams& streams,
hostdevice_vector<compression_result> const& comp_results,
Expand All @@ -313,7 +313,7 @@ class writer::impl {
orc_table_view const& orc_table,
rmm::device_buffer const& compressed_data,
intermediate_statistics& intermediate_stats,
uint8_t* stream_output);
host_span<uint8_t> bounce_buffer);

/**
* @brief Add the processed table data into the internal file footer.
Expand All @@ -334,9 +334,9 @@ class writer::impl {
CompressionKind const _compression_kind;
size_t const _compression_blocksize;
statistics_freq const _stats_freq;
bool const _single_write_mode; // Special parameter only used by `write()` to indicate that
// we are guaranteeing a single table write. This enables some
// internal optimizations.
SingleWriteMode const _single_write_mode; // Special parameter only used by `write()` to indicate
// that we are guaranteeing a single table write. This
// enables some internal optimizations.
std::map<std::string, std::string> const _kv_meta; // Optional user metadata.
std::unique_ptr<data_sink> const _out_sink;

Expand Down
Loading