Skip to content

Commit

Permalink
Implement a chunked_pack API (#13260)
Browse files Browse the repository at this point in the history
This PR introduces a "chunked pack" API built on top of `contiguous_split`. This API is used when we want to copy a cuDF `table_view` over the wire or to the host in a contiguous layout (aka `contiguous_split`), but without user provided memory. As a result this API does not allocate any buffers for GPU data and instead it uses a provided user buffer to perform the `contiguous_split` in chunks. 

Luckily, `contiguous_split` already had a subdivision of work that we are now calling "batches". Each batch is up to 1MB of data from the source table. As such, one can think of this function as copying as many batches as will fit in a user buffer (or "chunking the batches"). The API follows other chunked interfaces in cuDF with a `has_next` and a `next`, with the difference that in this case `next` takes a `device_span`, and the user can provide any `device_span` as long as the size is the same as the size specified during construction.

When thrust and scratch space is required on the GPU, this PR makes use of the memory resource passed, even to the point of using the second argument of `exec_policy` which is the memory resource. I found this while testing that `exec_policy` defaults to the per-device resource, and in this case I really wanted to pass a pooled memory resource, outside of our normal async memory resource to set aside this memory ahead of time.

Most of the changes are about moving things around. You'll see 3 structs with the name "packed" (e.g. `packed_split_indices_and_src_buf_info`). These are here to group together state that `contiguous_split` needs to work, but now because of `chunked_pack` we need to keep around as well for the subsequent calls to `next`. These structs are also packed in memory, which is an optimization contiguous_split had already done to reduce the number of d2h/h2d copies. This PR did need to add a state object (`contiguous_split_state`) that now `contiguous_split` leverages. It also makes use of the `metadata_builder` which we added in a prior PR.

This PR does not include the JNI changes needed for this to work on the java side, I'll post that separately. I figured this was too big already (and if people have suggestions on "chunking" this PR up, I am happy to do that). 

@nvdbaranec spent a great deal of time documenting `contiguous_split` for me and he suggested a path to get this done that I just followed (thank you!!)

Authors:
  - Alessandro Bellina (https://github.com/abellina)

Approvers:
  - https://github.com/nvdbaranec
  - Nghia Truong (https://github.com/ttnghia)
  - David Wendt (https://github.com/davidwendt)
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: #13260
  • Loading branch information
abellina authored May 18, 2023
1 parent e883a11 commit c601b83
Show file tree
Hide file tree
Showing 4 changed files with 2,109 additions and 604 deletions.
90 changes: 81 additions & 9 deletions cpp/benchmarks/copying/contiguous_split.cu
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,30 @@

#include <thrust/iterator/counting_iterator.h>

template <typename T>
void contiguous_split(cudf::table_view const& src_table, std::vector<cudf::size_type> const& splits)
{
auto result = cudf::contiguous_split(src_table, splits);
}

void chunked_pack(cudf::table_view const& src_table, std::vector<cudf::size_type> const&)
{
auto const mr = rmm::mr::get_current_device_resource();
auto const stream = cudf::get_default_stream();
auto user_buffer = rmm::device_uvector<std::uint8_t>(100L * 1024 * 1024, stream, mr);
auto chunked_pack = cudf::chunked_pack::create(src_table, user_buffer.size(), mr);
while (chunked_pack->has_next()) {
auto iter_size = chunked_pack->next(user_buffer);
}
stream.synchronize();
}

template <typename T, typename ContigSplitImpl>
void BM_contiguous_split_common(benchmark::State& state,
std::vector<T>& src_cols,
int64_t num_rows,
int64_t num_splits,
int64_t bytes_total)
int64_t bytes_total,
ContigSplitImpl& impl)
{
// generate splits
std::vector<cudf::size_type> splits;
Expand All @@ -57,16 +75,18 @@ void BM_contiguous_split_common(benchmark::State& state,

for (auto _ : state) {
cuda_event_timer raii(state, true); // flush_l2_cache = true, stream = 0
auto result = cudf::contiguous_split(src_table, splits);
impl(src_table, splits);
}

// it's 2x bytes_total because we're both reading and writing.
state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) * bytes_total * 2);
}

class ContiguousSplit : public cudf::benchmark {};
class ChunkedPack : public cudf::benchmark {};

void BM_contiguous_split(benchmark::State& state)
template <typename ContiguousSplitImpl>
void BM_contiguous_split(benchmark::State& state, ContiguousSplitImpl& impl)
{
int64_t const total_desired_bytes = state.range(0);
cudf::size_type const num_cols = state.range(1);
Expand All @@ -91,12 +111,14 @@ void BM_contiguous_split(benchmark::State& state)
(include_validity ? (max(int64_t{1}, (num_rows / 32)) * sizeof(cudf::bitmask_type) * num_cols)
: 0);

BM_contiguous_split_common(state, src_cols, num_rows, num_splits, total_bytes);
BM_contiguous_split_common(state, src_cols, num_rows, num_splits, total_bytes, impl);
}

class ContiguousSplitStrings : public cudf::benchmark {};
class ChunkedPackStrings : public cudf::benchmark {};

void BM_contiguous_split_strings(benchmark::State& state)
template <typename ContiguousSplitImpl>
void BM_contiguous_split_strings(benchmark::State& state, ContiguousSplitImpl& impl)
{
int64_t const total_desired_bytes = state.range(0);
cudf::size_type const num_cols = state.range(1);
Expand Down Expand Up @@ -133,13 +155,13 @@ void BM_contiguous_split_strings(benchmark::State& state)
(include_validity ? (max(int64_t{1}, (num_rows / 32)) * sizeof(cudf::bitmask_type) * num_cols)
: 0);

BM_contiguous_split_common(state, src_cols, num_rows, num_splits, total_bytes);
BM_contiguous_split_common(state, src_cols, num_rows, num_splits, total_bytes, impl);
}

#define CSBM_BENCHMARK_DEFINE(name, size, num_columns, num_splits, validity) \
BENCHMARK_DEFINE_F(ContiguousSplit, name)(::benchmark::State & state) \
{ \
BM_contiguous_split(state); \
BM_contiguous_split(state, contiguous_split); \
} \
BENCHMARK_REGISTER_F(ContiguousSplit, name) \
->Args({size, num_columns, num_splits, validity}) \
Expand Down Expand Up @@ -168,7 +190,7 @@ CSBM_BENCHMARK_DEFINE(1Gb1ColValidityNoSplits, (int64_t)1 * 1024 * 1024 * 1024,
#define CSBM_STRINGS_BENCHMARK_DEFINE(name, size, num_columns, num_splits, validity) \
BENCHMARK_DEFINE_F(ContiguousSplitStrings, name)(::benchmark::State & state) \
{ \
BM_contiguous_split_strings(state); \
BM_contiguous_split_strings(state, contiguous_split); \
} \
BENCHMARK_REGISTER_F(ContiguousSplitStrings, name) \
->Args({size, num_columns, num_splits, validity}) \
Expand All @@ -189,3 +211,53 @@ CSBM_STRINGS_BENCHMARK_DEFINE(1Gb10ColsNoValidity, (int64_t)1 * 1024 * 1024 * 10
CSBM_STRINGS_BENCHMARK_DEFINE(1Gb10ColsValidity, (int64_t)1 * 1024 * 1024 * 1024, 10, 256, 1);
CSBM_STRINGS_BENCHMARK_DEFINE(1Gb1ColNoSplits, (int64_t)1 * 1024 * 1024 * 1024, 1, 0, 0);
CSBM_STRINGS_BENCHMARK_DEFINE(1Gb1ColValidityNoSplits, (int64_t)1 * 1024 * 1024 * 1024, 1, 0, 1);

#define CCSBM_BENCHMARK_DEFINE(name, size, num_columns, num_splits, validity) \
BENCHMARK_DEFINE_F(ChunkedPack, name)(::benchmark::State & state) \
{ \
BM_contiguous_split(state, chunked_pack); \
} \
BENCHMARK_REGISTER_F(ChunkedPack, name) \
->Args({size, num_columns, num_splits, validity}) \
->Unit(benchmark::kMillisecond) \
->UseManualTime() \
->Iterations(8)
CCSBM_BENCHMARK_DEFINE(6Gb512ColsNoValidity, (int64_t)6 * 1024 * 1024 * 1024, 512, 0, 0);
CCSBM_BENCHMARK_DEFINE(6Gb512ColsValidity, (int64_t)6 * 1024 * 1024 * 1024, 512, 0, 1);
CCSBM_BENCHMARK_DEFINE(6Gb10ColsNoValidity, (int64_t)6 * 1024 * 1024 * 1024, 10, 0, 0);
CCSBM_BENCHMARK_DEFINE(6Gb10ColsValidity, (int64_t)6 * 1024 * 1024 * 1024, 10, 0, 1);

CCSBM_BENCHMARK_DEFINE(4Gb512ColsNoValidity, (int64_t)4 * 1024 * 1024 * 1024, 512, 0, 0);
CCSBM_BENCHMARK_DEFINE(4Gb512ColsValidity, (int64_t)4 * 1024 * 1024 * 1024, 512, 0, 1);
CCSBM_BENCHMARK_DEFINE(4Gb10ColsNoValidity, (int64_t)4 * 1024 * 1024 * 1024, 10, 0, 0);
CCSBM_BENCHMARK_DEFINE(4Gb10ColsValidity, (int64_t)4 * 1024 * 1024 * 1024, 10, 0, 1);
CCSBM_BENCHMARK_DEFINE(4Gb4ColsValidity, (int64_t)1 * 1024 * 1024 * 1024, 4, 0, 1);

CCSBM_BENCHMARK_DEFINE(1Gb512ColsNoValidity, (int64_t)1 * 1024 * 1024 * 1024, 512, 0, 0);
CCSBM_BENCHMARK_DEFINE(1Gb512ColsValidity, (int64_t)1 * 1024 * 1024 * 1024, 512, 0, 1);
CCSBM_BENCHMARK_DEFINE(1Gb10ColsNoValidity, (int64_t)1 * 1024 * 1024 * 1024, 10, 0, 0);
CCSBM_BENCHMARK_DEFINE(1Gb10ColsValidity, (int64_t)1 * 1024 * 1024 * 1024, 10, 0, 1);
CCSBM_BENCHMARK_DEFINE(1Gb1ColValidity, (int64_t)1 * 1024 * 1024 * 1024, 1, 0, 1);

#define CCSBM_STRINGS_BENCHMARK_DEFINE(name, size, num_columns, num_splits, validity) \
BENCHMARK_DEFINE_F(ChunkedPackStrings, name)(::benchmark::State & state) \
{ \
BM_contiguous_split_strings(state, chunked_pack); \
} \
BENCHMARK_REGISTER_F(ChunkedPackStrings, name) \
->Args({size, num_columns, num_splits, validity}) \
->Unit(benchmark::kMillisecond) \
->UseManualTime() \
->Iterations(8)

CCSBM_STRINGS_BENCHMARK_DEFINE(4Gb512ColsNoValidity, (int64_t)4 * 1024 * 1024 * 1024, 512, 0, 0);
CCSBM_STRINGS_BENCHMARK_DEFINE(4Gb512ColsValidity, (int64_t)4 * 1024 * 1024 * 1024, 512, 0, 1);
CCSBM_STRINGS_BENCHMARK_DEFINE(4Gb10ColsNoValidity, (int64_t)4 * 1024 * 1024 * 1024, 10, 0, 0);
CCSBM_STRINGS_BENCHMARK_DEFINE(4Gb10ColsValidity, (int64_t)4 * 1024 * 1024 * 1024, 10, 0, 1);
CCSBM_STRINGS_BENCHMARK_DEFINE(4Gb4ColsValidity, (int64_t)1 * 1024 * 1024 * 1024, 4, 0, 1);

CCSBM_STRINGS_BENCHMARK_DEFINE(1Gb512ColsNoValidity, (int64_t)1 * 1024 * 1024 * 1024, 512, 0, 0);
CCSBM_STRINGS_BENCHMARK_DEFINE(1Gb512ColsValidity, (int64_t)1 * 1024 * 1024 * 1024, 512, 0, 1);
CCSBM_STRINGS_BENCHMARK_DEFINE(1Gb10ColsNoValidity, (int64_t)1 * 1024 * 1024 * 1024, 10, 0, 0);
CCSBM_STRINGS_BENCHMARK_DEFINE(1Gb10ColsValidity, (int64_t)1 * 1024 * 1024 * 1024, 10, 0, 1);
CCSBM_STRINGS_BENCHMARK_DEFINE(1Gb1ColValidity, (int64_t)1 * 1024 * 1024 * 1024, 1, 0, 1);
147 changes: 147 additions & 0 deletions cpp/include/cudf/contiguous_split.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,153 @@ std::vector<packed_table> contiguous_split(
std::vector<size_type> const& splits,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

namespace detail {
struct contiguous_split_state;
};

/**
* @brief Perform a chunked "pack" operation of the input `table_view` using a user provided
* buffer of size `user_buffer_size`.
*
* The intent of this operation is to be used in a streamed fashion at times of GPU
* out-of-memory, where we want to minimize the number of small cudaMemcpy calls and
* tracking of all the metadata associated with cudf tables. Because of the memory constraints,
* all thrust and scratch memory allocations are using the passed-in memory resource exclusively,
* not a per-device memory resource.
*
* This class defines two methods that must be used in concert to carry out the chunked_pack:
* has_next and next. Here is an example:
*
* @code{.pseudo}
* // Create a table_view
* cudf::table_view tv = ...;
*
* // Choose a memory resource (optional). This memory resource is used for scratch/thrust temporary
* // data. In memory constrained cases, this can be used to set aside scratch memory
* // for `chunked_pack` at the beginning of a program.
* auto mr = rmm::mr::get_current_device_resource();
*
* // Define a buffer size for each chunk: the larger the buffer is, the more SMs can be
* // occupied by this algorithm.
* //
* // Internally, the GPU unit of work is a 1MB batch. When we instantiate `cudf::chunked_pack`,
* // all the 1MB batches for the source table_view are computed up front. Additionally,
* // chunked_pack calculates the number of iterations that are required to go through all those
* // batches given a `user_buffer_size` buffer. The number of 1MB batches in each iteration (chunk)
* // equals the number of CUDA blocks that will be used for the main kernel launch.
* //
* std::size_t user_buffer_size = 128*1024*1024;
*
* auto chunked_packer = cudf::chunked_pack::create(tv, user_buffer_size, mr);
*
* std::size_t host_offset = 0;
* auto host_buffer = ...; // obtain a host buffer you would like to copy to
*
* while (chunked_packer->has_next()) {
* // get a user buffer of size `user_buffer_size`
* cudf::device_span<uint8_t> user_buffer = ...;
* std::size_t bytes_copied = chunked_packer->next(user_buffer);
*
* // buffer will hold the contents of at most `user_buffer_size` bytes
* // of the contiguously packed input `table_view`. You are now free to copy
* // this memory somewhere else, for example, to host.
* cudaMemcpyAsync(
* host_buffer.data() + host_offset,
* user_buffer.data(),
* bytes_copied,
* cudaMemcpyDefault,
* stream);
*
* host_offset += bytes_copied;
* }
* @endcode
*/
class chunked_pack {
public:
/**
* @brief Construct a `chunked_pack` class.
*
* @param input source `table_view` to pack
* @param user_buffer_size buffer size (in bytes) that will be passed on `next`. Must be
* at least 1MB
* @param temp_mr An optional memory resource to be used for temporary and scratch allocations
* only
*/
explicit chunked_pack(
cudf::table_view const& input,
std::size_t user_buffer_size,
rmm::mr::device_memory_resource* temp_mr = rmm::mr::get_current_device_resource());

/**
* @brief Destructor that will be implemented as default. Declared with definition here because
* contiguous_split_state is incomplete at this stage.
*/
~chunked_pack();

/**
* @brief Obtain the total size of the contiguously packed `table_view`.
*
* @return total size (in bytes) of all the chunks
*/
[[nodiscard]] std::size_t get_total_contiguous_size() const;

/**
* @brief Function to check if there are chunks left to be copied.
*
* @return true if there are chunks left to be copied, and false otherwise
*/
[[nodiscard]] bool has_next() const;

/**
* @brief Packs the next chunk into `user_buffer`. This should be called as long as
* `has_next` returns true. If `next` is called when `has_next` is false, an exception
* is thrown.
*
* @throws cudf::logic_error If the size of `user_buffer` is different than `user_buffer_size`
* @throws cudf::logic_error If called after all chunks have been copied
*
* @param user_buffer device span target for the chunk. The size of this span must equal
* the `user_buffer_size` parameter passed at construction
* @return The number of bytes that were written to `user_buffer` (at most
* `user_buffer_size`)
*/
[[nodiscard]] std::size_t next(cudf::device_span<uint8_t> const& user_buffer);

/**
* @brief Build the opaque metadata for all added columns.
*
* @return A vector containing the serialized column metadata
*/
[[nodiscard]] std::unique_ptr<std::vector<uint8_t>> build_metadata() const;

/**
* @brief Creates a `chunked_pack` instance to perform a "pack" of the `table_view`
* "input", where a buffer of `user_buffer_size` is filled with chunks of the
* overall operation. This operation can be used in cases where GPU memory is constrained.
*
* The memory resource (`temp_mr`) could be a special memory resource to be used in
* situations when GPU memory is low and we want scratch and temporary allocations to
* happen from a small reserved pool of memory. Note that it defaults to the regular cuDF
* per-device resource.
*
* @throws cudf::logic_error When user_buffer_size is less than 1MB
*
* @param input source `table_view` to pack
* @param user_buffer_size buffer size (in bytes) that will be passed on `next`. Must be
* at least 1MB
* @param temp_mr RMM memory resource to be used for temporary and scratch allocations only
* @return a unique_ptr of chunked_pack
*/
[[nodiscard]] static std::unique_ptr<chunked_pack> create(
cudf::table_view const& input,
std::size_t user_buffer_size,
rmm::mr::device_memory_resource* temp_mr = rmm::mr::get_current_device_resource());

private:
// internal state of contiguous split
std::unique_ptr<detail::contiguous_split_state> state;
};

/**
* @brief Deep-copy a `table_view` into a serialized contiguous memory format.
*
Expand Down
Loading

0 comments on commit c601b83

Please sign in to comment.