Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist string statistics data across multiple calls to orc chunked write #10694

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ ConfigureBench(PARQUET_WRITER_BENCH io/parquet/parquet_writer.cpp)
# * orc writer benchmark --------------------------------------------------------------------------
ConfigureBench(ORC_WRITER_BENCH io/orc/orc_writer.cpp)

# ##################################################################################################
# * orc writer chunks benchmark ---------------------------------------------------------------
ConfigureBench(ORC_WRITER_CHUNKS_BENCH io/orc/orc_writer_chunks.cpp)

# ##################################################################################################
# * csv writer benchmark --------------------------------------------------------------------------
ConfigureBench(CSV_WRITER_BENCH io/csv/csv_writer.cpp)
Expand Down
123 changes: 123 additions & 0 deletions cpp/benchmarks/io/orc/orc_writer_chunks.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <benchmarks/common/generate_input.hpp>
#include <benchmarks/fixture/benchmark_fixture.hpp>
#include <benchmarks/io/cuio_common.hpp>
#include <benchmarks/synchronization/synchronization.hpp>

#include <cudf/column/column.hpp>
#include <cudf/io/orc.hpp>
#include <cudf/table/table.hpp>

// to enable, run cmake with -DBUILD_BENCHMARKS=ON

constexpr int64_t data_size = 512 << 20;

namespace cudf_io = cudf::io;

class OrcWrite : public cudf::benchmark {
};
class OrcWriteChunked : public cudf::benchmark {
};

void ORC_write(benchmark::State& state)
{
cudf::size_type num_cols = state.range(0);

auto tbl = create_random_table(cycle_dtypes({cudf::type_id::INT32,
cudf::type_id::FLOAT64,
cudf::type_id::STRING,
cudf::type_id::DECIMAL32,
cudf::type_id::INT8},
num_cols),
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
table_size_bytes{data_size});
cudf::table_view view = tbl->view();

auto mem_stats_logger = cudf::memory_stats_logger();
cuio_source_sink_pair source_sink(io_type::VOID);
for (auto _ : state) {
cuda_event_timer raii(state, true); // flush_l2_cache = true, stream = 0
cudf_io::orc_writer_options opts =
cudf_io::orc_writer_options::builder(source_sink.make_sink_info(), view);
cudf_io::write_orc(opts);
}

state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) * data_size);
state.counters["peak_memory_usage"] = mem_stats_logger.peak_memory_usage();
state.counters["encoded_file_size"] = source_sink.size();
}

void ORC_write_chunked(benchmark::State& state)
{
cudf::size_type num_cols = state.range(0);
cudf::size_type num_tables = state.range(1);

std::vector<std::unique_ptr<cudf::table>> tables;
for (cudf::size_type idx = 0; idx < num_tables; idx++) {
tables.push_back(create_random_table(cycle_dtypes({cudf::type_id::INT32,
cudf::type_id::FLOAT64,
cudf::type_id::STRING,
cudf::type_id::DECIMAL32,
cudf::type_id::INT8},
num_cols),
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
table_size_bytes{size_t(data_size / num_tables)}));
}

auto mem_stats_logger = cudf::memory_stats_logger();
cuio_source_sink_pair source_sink(io_type::VOID);
for (auto _ : state) {
cuda_event_timer raii(state, true); // flush_l2_cache = true, stream = 0
cudf_io::chunked_orc_writer_options opts =
cudf_io::chunked_orc_writer_options::builder(source_sink.make_sink_info());
cudf_io::orc_chunked_writer writer(opts);
std::for_each(tables.begin(), tables.end(), [&writer](std::unique_ptr<cudf::table> const& tbl) {
writer.write(*tbl);
});
writer.close();
}

state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) * data_size);
state.counters["peak_memory_usage"] = mem_stats_logger.peak_memory_usage();
state.counters["encoded_file_size"] = source_sink.size();
}

#define OWBM_BENCHMARK_DEFINE(name, size, num_columns) \
BENCHMARK_DEFINE_F(OrcWrite, name)(::benchmark::State & state) { ORC_write(state); } \
BENCHMARK_REGISTER_F(OrcWrite, name) \
->Args({num_columns}) \
->Unit(benchmark::kMillisecond) \
->UseManualTime()

OWBM_BENCHMARK_DEFINE(3Gb8Cols, data_size, 8);
OWBM_BENCHMARK_DEFINE(3Gb1024Cols, data_size, 1024);
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved

#define OWCBM_BENCHMARK_DEFINE(name, num_columns, num_chunks) \
BENCHMARK_DEFINE_F(OrcWriteChunked, name)(::benchmark::State & state) \
{ \
ORC_write_chunked(state); \
} \
BENCHMARK_REGISTER_F(OrcWriteChunked, name) \
->Args({num_columns, num_chunks}) \
->Unit(benchmark::kMillisecond) \
->UseManualTime() \
->Iterations(4)
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved

OWCBM_BENCHMARK_DEFINE(3Gb8Cols64Chunks, 8, 8);
OWCBM_BENCHMARK_DEFINE(3Gb1024Cols64Chunks, 1024, 8);

OWCBM_BENCHMARK_DEFINE(3Gb8Cols128Chunks, 8, 64);
OWCBM_BENCHMARK_DEFINE(3Gb1024Cols128Chunks, 1024, 64);
194 changes: 140 additions & 54 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
#include <numeric>
#include <utility>

#include <cooperative_groups.h>
#include <cooperative_groups/memcpy_async.h>

#include <cuda/std/limits>

namespace cudf {
Expand Down Expand Up @@ -1233,55 +1236,61 @@ writer::impl::encoded_footer_statistics writer::impl::finish_statistic_blobs(
auto const num_stripe_blobs =
thrust::reduce(stripe_size_iter, stripe_size_iter + per_chunk_stats.stripe_stat_merge.size());
auto const num_file_blobs = num_columns;
auto const num_blobs = single_write_mode ? static_cast<int>(num_stripe_blobs + num_file_blobs)
: static_cast<int>(num_stripe_blobs);
auto const num_blobs = static_cast<int>(num_stripe_blobs + num_file_blobs);

if (num_stripe_blobs == 0) { return {}; }

// merge the stripe persisted data and add file data
rmm::device_uvector<statistics_chunk> stat_chunks(num_blobs, stream);
hostdevice_vector<statistics_merge_group> stats_merge(num_blobs, stream);

size_t chunk_offset = 0;
size_t merge_offset = 0;
// we need to merge the stat arrays from the persisted data.
// this needs to be done carefully because each array can contain
// a different number of stripes and stripes from each column must be
// located next to each other. We know the total number of stripes and
// we know the size of each array. The number of stripes per column in a chunk array can
// be calculated by dividing the number of chunks by the number of columns.
// That many chunks need to be copied at a time to the proper destination.
size_t num_entries_seen = 0;
for (size_t i = 0; i < per_chunk_stats.stripe_stat_chunks.size(); ++i) {
auto chunk_bytes = per_chunk_stats.stripe_stat_chunks[i].size() * sizeof(statistics_chunk);
auto merge_bytes = per_chunk_stats.stripe_stat_merge[i].size() * sizeof(statistics_merge_group);
cudaMemcpyAsync(stat_chunks.data() + chunk_offset,
per_chunk_stats.stripe_stat_chunks[i].data(),
chunk_bytes,
cudaMemcpyDeviceToDevice,
stream);
cudaMemcpyAsync(stats_merge.device_ptr() + merge_offset,
per_chunk_stats.stripe_stat_merge[i].device_ptr(),
merge_bytes,
cudaMemcpyDeviceToDevice,
stream);
chunk_offset += per_chunk_stats.stripe_stat_chunks[i].size();
merge_offset += per_chunk_stats.stripe_stat_merge[i].size();
auto const stripes_per_col = per_chunk_stats.stripe_stat_chunks[i].size() / num_columns;

auto const chunk_bytes = stripes_per_col * sizeof(statistics_chunk);
auto const merge_bytes = stripes_per_col * sizeof(statistics_merge_group);
for (size_t col = 0; col < num_columns; ++col) {
cudaMemcpyAsync(stat_chunks.data() + (num_stripes * col) + num_entries_seen,
vyasr marked this conversation as resolved.
Show resolved Hide resolved
per_chunk_stats.stripe_stat_chunks[i].data() + col * stripes_per_col,
chunk_bytes,
cudaMemcpyDeviceToDevice,
stream);
cudaMemcpyAsync(stats_merge.device_ptr() + (num_stripes * col) + num_entries_seen,
per_chunk_stats.stripe_stat_merge[i].device_ptr() + col * stripes_per_col,
merge_bytes,
cudaMemcpyDeviceToDevice,
stream);
}
num_entries_seen += stripes_per_col;
}

if (single_write_mode) {
vyasr marked this conversation as resolved.
Show resolved Hide resolved
std::vector<statistics_merge_group> file_stats_merge(num_file_blobs);
for (auto i = 0u; i < num_file_blobs; ++i) {
auto col_stats = &file_stats_merge[i];
col_stats->col_dtype = per_chunk_stats.col_types[i];
col_stats->stats_dtype = per_chunk_stats.stats_dtypes[i];
col_stats->start_chunk = static_cast<uint32_t>(i * num_stripes);
col_stats->num_chunks = static_cast<uint32_t>(num_stripes);
}
std::vector<statistics_merge_group> file_stats_merge(num_file_blobs);
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
for (auto i = 0u; i < num_file_blobs; ++i) {
auto col_stats = &file_stats_merge[i];
col_stats->col_dtype = per_chunk_stats.col_types[i];
col_stats->stats_dtype = per_chunk_stats.stats_dtypes[i];
col_stats->start_chunk = static_cast<uint32_t>(i * num_stripes);
col_stats->num_chunks = static_cast<uint32_t>(num_stripes);
}

auto d_file_stats_merge = stats_merge.device_ptr(num_stripe_blobs);
cudaMemcpyAsync(d_file_stats_merge,
file_stats_merge.data(),
num_file_blobs * sizeof(statistics_merge_group),
cudaMemcpyHostToDevice,
stream);
auto d_file_stats_merge = stats_merge.device_ptr(num_stripe_blobs);
cudaMemcpyAsync(d_file_stats_merge,
file_stats_merge.data(),
num_file_blobs * sizeof(statistics_merge_group),
cudaMemcpyHostToDevice,
stream);

auto file_stat_chunks = stat_chunks.data() + num_stripe_blobs;
detail::merge_group_statistics<detail::io_file_format::ORC>(
file_stat_chunks, stat_chunks.data(), d_file_stats_merge, num_file_blobs, stream);
}
auto file_stat_chunks = stat_chunks.data() + num_stripe_blobs;
detail::merge_group_statistics<detail::io_file_format::ORC>(
file_stat_chunks, stat_chunks.data(), d_file_stats_merge, num_file_blobs, stream);

hostdevice_vector<uint8_t> blobs =
allocate_and_encode_blobs(stats_merge, stat_chunks, num_blobs, stream);
Expand All @@ -1295,14 +1304,12 @@ writer::impl::encoded_footer_statistics writer::impl::finish_statistic_blobs(
stripe_blobs[i].assign(stat_begin, stat_end);
}

std::vector<ColStatsBlob> file_blobs(single_write_mode ? num_file_blobs : 0);
if (single_write_mode) {
auto file_stat_merge = stats_merge.host_ptr(num_stripe_blobs);
for (auto i = 0u; i < num_file_blobs; i++) {
auto const stat_begin = blobs.host_ptr(file_stat_merge[i].start_chunk);
auto const stat_end = stat_begin + file_stat_merge[i].num_chunks;
file_blobs[i].assign(stat_begin, stat_end);
}
std::vector<ColStatsBlob> file_blobs(num_file_blobs);
auto file_stat_merge = stats_merge.host_ptr(num_stripe_blobs);
for (auto i = 0u; i < num_file_blobs; i++) {
auto const stat_begin = blobs.host_ptr(file_stat_merge[i].start_chunk);
auto const stat_end = stat_begin + file_stat_merge[i].num_chunks;
file_blobs[i].assign(stat_begin, stat_end);
}

return {std::move(stripe_blobs), std::move(file_blobs)};
Expand Down Expand Up @@ -1937,6 +1944,91 @@ string_dictionaries allocate_dictionaries(orc_table_view const& orc_table,
std::move(is_dict_enabled)};
}

struct string_length_functor {
__device__ inline size_type operator()(int const i) const
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
{
// we translate from 0 -> num_chunks * 2 because each statistic has a min and max
// string and we need to calculate lengths for both.
if (i >= num_chunks * 2) return 0;

// min strings are even values, max strings are odd values of i
auto const min = i % 2 == 0;
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
// index of the chunk
auto const idx = i / 2;
auto& str_val =
min ? stripe_stat_chunks[idx].min_value.str_val : stripe_stat_chunks[idx].max_value.str_val;
auto const str = stripe_stat_merge[idx].stats_dtype == dtype_string;
return str ? str_val.length : 0;
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
}

int const num_chunks;
statistics_chunk const* stripe_stat_chunks;
statistics_merge_group const* stripe_stat_merge;
};

__global__ void copy_string_data(char* string_pool,
size_type* offsets,
statistics_chunk* chunks,
statistics_merge_group const* groups)
{
auto const idx = blockIdx.x / 2;
if (groups[idx].stats_dtype == dtype_string) {
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
auto const min = blockIdx.x % 2 == 0;
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
auto& str_val = min ? chunks[idx].min_value.str_val : chunks[idx].max_value.str_val;
auto dst = &string_pool[offsets[blockIdx.x]];
auto src = str_val.ptr;

for (int i = threadIdx.x; i < str_val.length; i += blockDim.x) {
dst[i] = src[i];
}
if (threadIdx.x == 0) { str_val.ptr = dst; }
}
}

void writer::impl::persisted_statistics::persist(int num_table_rows,
bool single_write_mode,
intermediate_statistics& intermediate_stats,
rmm::cuda_stream_view stream)
{
if (not single_write_mode) {
// persist the strings in the chunks into a string pool and update pointers
auto const num_chunks = static_cast<int>(intermediate_stats.stripe_stat_chunks.size());
// min offset and max offset + 1 for total size
rmm::device_uvector<size_type> offsets((num_chunks * 2) + 1, stream);

auto iter = cudf::detail::make_counting_transform_iterator(
0,
string_length_functor{num_chunks,
intermediate_stats.stripe_stat_chunks.data(),
intermediate_stats.stripe_stat_merge.device_ptr()});
thrust::exclusive_scan(
rmm::exec_policy(stream), iter, iter + (num_chunks * 2) + 1, offsets.begin());
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved

// pull size back to host
auto const total_string_pool_size = offsets.element(num_chunks * 2, stream);
if (total_string_pool_size > 0) {
rmm::device_uvector<char> string_pool(total_string_pool_size, stream);

// offsets describes where in the string pool each string goes. Going with the simple
// approach for now, but it is possible something fancier with breaking up each thread into
// copying x bytes instead of a single string is the better method since we are dealing in
// min/max strings they almost certainly will not be uniform length.
copy_string_data<<<num_chunks * 2, 256, 0, stream.value()>>>(
string_pool.data(),
offsets.data(),
intermediate_stats.stripe_stat_chunks.data(),
intermediate_stats.stripe_stat_merge.device_ptr());
string_pools.emplace_back(std::move(string_pool));
}
}

stripe_stat_chunks.emplace_back(std::move(intermediate_stats.stripe_stat_chunks));
stripe_stat_merge.emplace_back(std::move(intermediate_stats.stripe_stat_merge));
stats_dtypes = std::move(intermediate_stats.stats_dtypes);
col_types = std::move(intermediate_stats.col_types);
num_rows = num_table_rows;
}

void writer::impl::write(table_view const& table)
{
CUDF_EXPECTS(not closed, "Data has already been flushed to out and closed");
Expand Down Expand Up @@ -2073,13 +2165,8 @@ void writer::impl::write(table_view const& table)
auto intermediate_stats = gather_statistic_blobs(stats_freq_, orc_table, segmentation);

if (intermediate_stats.stripe_stat_chunks.size() > 0) {
persisted_stripe_statistics.stripe_stat_chunks.emplace_back(
std::move(intermediate_stats.stripe_stat_chunks));
persisted_stripe_statistics.stripe_stat_merge.emplace_back(
std::move(intermediate_stats.stripe_stat_merge));
persisted_stripe_statistics.stats_dtypes = std::move(intermediate_stats.stats_dtypes);
persisted_stripe_statistics.col_types = std::move(intermediate_stats.col_types);
persisted_stripe_statistics.num_rows = orc_table.num_rows();
persisted_stripe_statistics.persist(
orc_table.num_rows(), single_write_mode, intermediate_stats, stream);
}

// Write stripes
Expand Down Expand Up @@ -2139,7 +2226,6 @@ void writer::impl::write(table_view const& table)
}
out_sink_->host_write(buffer_.data(), buffer_.size());
}

for (auto const& task : write_tasks) {
task.wait();
}
Expand Down Expand Up @@ -2202,7 +2288,7 @@ void writer::impl::close()
auto const statistics = finish_statistic_blobs(ff.stripes.size(), persisted_stripe_statistics);

// File-level statistics
if (single_write_mode and not statistics.file_level.empty()) {
if (not statistics.file_level.empty()) {
buffer_.resize(0);
pbw_.put_uint(encode_field_number<size_type>(1));
pbw_.put_uint(persisted_stripe_statistics.num_rows);
Expand Down
Loading