Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/branch-22.04' into jni-jpointera…
Browse files Browse the repository at this point in the history
…rray-fix
  • Loading branch information
mythrocks committed Jan 24, 2022
2 parents abcca8a + cfb6cbe commit e6a3ffd
Show file tree
Hide file tree
Showing 23 changed files with 247 additions and 96 deletions.
2 changes: 1 addition & 1 deletion ci/benchmark/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export GBENCH_BENCHMARKS_DIR="$WORKSPACE/cpp/build/gbenchmarks/"
export LIBCUDF_KERNEL_CACHE_PATH="$HOME/.jitify-cache"

# Dask & Distributed git tag
export DASK_DISTRIBUTED_GIT_TAG='main'
export DASK_DISTRIBUTED_GIT_TAG='2022.01.0'

function remove_libcudf_kernel_cache_dir {
EXITCODE=$?
Expand Down
2 changes: 1 addition & 1 deletion ci/gpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export GIT_DESCRIBE_TAG=`git describe --tags`
export MINOR_VERSION=`echo $GIT_DESCRIBE_TAG | grep -o -E '([0-9]+\.[0-9]+)'`

# Dask & Distributed git tag
export DASK_DISTRIBUTED_GIT_TAG='main'
export DASK_DISTRIBUTED_GIT_TAG='2022.01.0'

# ucx-py version
export UCX_PY_VERSION='0.25.*'
Expand Down
4 changes: 2 additions & 2 deletions conda/environments/cudf_dev_cuda11.5.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ dependencies:
- pydocstyle=6.1.1
- typing_extensions
- pre-commit
- dask>=2021.11.1
- distributed>=2021.11.1
- dask>=2021.11.1,<=2022.01.0
- distributed>=2021.11.1,<=2022.01.0
- streamz
- arrow-cpp=5.0.0
- dlpack>=0.5,<0.6.0a0
Expand Down
4 changes: 2 additions & 2 deletions conda/recipes/custreamz/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ requirements:
- python
- streamz
- cudf {{ version }}
- dask>=2021.11.1,<=2021.11.2
- distributed>=2021.11.1,<=2021.11.2
- dask>=2021.11.1,<=2022.01.0
- distributed>=2021.11.1,<=2022.01.0
- python-confluent-kafka >=1.7.0,<1.8.0a0
- cudf_kafka {{ version }}

Expand Down
8 changes: 4 additions & 4 deletions conda/recipes/dask-cudf/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ requirements:
host:
- python
- cudf {{ version }}
- dask>=2021.11.1
- distributed>=2021.11.1
- dask>=2021.11.1,<=2022.01.0
- distributed>=2021.11.1,<=2022.01.0
- cudatoolkit {{ cuda_version }}
run:
- python
- cudf {{ version }}
- dask>=2021.11.1
- distributed>=2021.11.1
- dask>=2021.11.1,<=2022.01.0
- distributed>=2021.11.1,<=2022.01.0
- {{ pin_compatible('cudatoolkit', max_pin='x', min_pin='x') }}

test: # [linux64]
Expand Down
19 changes: 14 additions & 5 deletions cpp/benchmarks/io/orc/orc_writer_benchmark.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "cudf/io/types.hpp"
#include <benchmark/benchmark.h>

#include <benchmarks/common/generate_benchmark_input.hpp>
Expand Down Expand Up @@ -65,8 +66,14 @@ void BM_orc_write_varying_inout(benchmark::State& state)

void BM_orc_write_varying_options(benchmark::State& state)
{
auto const compression = static_cast<cudf::io::compression_type>(state.range(0));
auto const enable_stats = state.range(1) != 0;
auto const compression = static_cast<cudf::io::compression_type>(state.range(0));
auto const stats_freq = [&] {
switch (state.range(2)) {
case 0: return cudf::io::STATISTICS_NONE;
case 1: return cudf::io::ORC_STATISTICS_STRIPE;
default: return cudf::io::ORC_STATISTICS_ROW_GROUP;
}
}();

auto const data_types = get_type_or_group({int32_t(type_group_id::INTEGRAL_SIGNED),
int32_t(type_group_id::FLOATING_POINT),
Expand All @@ -85,7 +92,7 @@ void BM_orc_write_varying_options(benchmark::State& state)
cudf_io::orc_writer_options const options =
cudf_io::orc_writer_options::builder(source_sink.make_sink_info(), view)
.compression(compression)
.enable_statistics(enable_stats);
.enable_statistics(stats_freq);
cudf_io::write_orc(options);
}

Expand Down Expand Up @@ -113,6 +120,8 @@ BENCHMARK_DEFINE_F(OrcWrite, writer_options)
BENCHMARK_REGISTER_F(OrcWrite, writer_options)
->ArgsProduct({{int32_t(cudf::io::compression_type::NONE),
int32_t(cudf::io::compression_type::SNAPPY)},
{0, 1}})
{int32_t{cudf::io::STATISTICS_NONE},
int32_t{cudf::io::ORC_STATISTICS_STRIPE},
int32_t{cudf::io::ORC_STATISTICS_ROW_GROUP}}})
->Unit(benchmark::kMillisecond)
->UseManualTime();
20 changes: 13 additions & 7 deletions cpp/docs/BENCHMARKING.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
# Unit Benchmarking in libcudf

Unit benchmarks in libcudf are written using [Google Benchmark](https://github.com/google/benchmark).
Unit benchmarks in libcudf are written using [NVBench](https://github.com/NVIDIA/nvbench).
While many existing benchmarks are written using
[Google Benchmark](https://github.com/google/benchmark), new benchmarks should use NVBench.

Google Benchmark provides many options for specifying ranges of parameters to benchmarks to test
with varying parameters, as well as to control the time unit reported, among other options. Refer to
other benchmarks in `cpp/benchmarks` to understand the options.
The NVBench library is similar to Google Benchmark, but has several quality of life improvements
when doing GPU benchmarking such as displaying the fraction of peak memory bandwidth achieved and
details about the GPU hardware.

Both NVBench and Google Benchmark provide many options for specifying ranges of parameters to
benchmark, as well as to control the time unit reported, among other options. Refer to existing
benchmarks in `cpp/benchmarks` to understand the options.

## Directory and File Naming

The naming of unit benchmark directories and source files should be consistent with the feature
being benchmarked. For example, the benchmarks for APIs in `copying.hpp` should live in
`cudf/cpp/benchmarks/copying`. Each feature (or set of related features) should have its own
`cpp/benchmarks/copying`. Each feature (or set of related features) should have its own
benchmark source file named `<feature>_benchmark.cu/cpp`. For example,
`cudf/cpp/src/copying/scatter.cu` has benchmarks in
`cudf/cpp/benchmarks/copying/scatter_benchmark.cu`.
`cpp/src/copying/scatter.cu` has benchmarks in
`cpp/benchmarks/copying/scatter_benchmark.cu`.

In the interest of improving compile time, whenever possible, test source files should be `.cpp`
files because `nvcc` is slower than `gcc` in compiling host code. Note that `thrust::device_vector`
Expand Down
84 changes: 62 additions & 22 deletions cpp/include/cudf/io/orc.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -434,6 +434,18 @@ table_with_metadata read_orc(
*/
class orc_writer_options_builder;

/**
* @brief Constants to disambiguate statistics terminology for ORC.
*
* ORC refers to its finest granularity of row-grouping as "row group",
* which corresponds to Parquet "pages".
* Similarly, ORC's "stripe" corresponds to a Parquet "row group".
* The following constants disambiguate the terminology for the statistics
* collected at each level.
*/
static constexpr statistics_freq ORC_STATISTICS_STRIPE = statistics_freq::STATISTICS_ROWGROUP;
static constexpr statistics_freq ORC_STATISTICS_ROW_GROUP = statistics_freq::STATISTICS_PAGE;

/**
* @brief Settings to use for `write_orc()`.
*/
Expand All @@ -442,8 +454,8 @@ class orc_writer_options {
sink_info _sink;
// Specify the compression format to use
compression_type _compression = compression_type::AUTO;
// Enable writing column statistics
bool _enable_statistics = true;
// Specify frequency of statistics collection
statistics_freq _stats_freq = ORC_STATISTICS_ROW_GROUP;
// Maximum size of each stripe (unless smaller than a single row group)
size_t _stripe_size_bytes = default_stripe_size_bytes;
// Maximum number of rows in stripe (unless smaller than a single row group)
Expand Down Expand Up @@ -501,7 +513,15 @@ class orc_writer_options {
/**
* @brief Whether writing column statistics is enabled/disabled.
*/
[[nodiscard]] bool is_enabled_statistics() const { return _enable_statistics; }
[[nodiscard]] bool is_enabled_statistics() const
{
return _stats_freq != statistics_freq::STATISTICS_NONE;
}

/**
* @brief Returns frequency of statistics collection.
*/
[[nodiscard]] statistics_freq get_statistics_freq() const { return _stats_freq; }

/**
* @brief Returns maximum stripe size, in bytes.
Expand Down Expand Up @@ -550,11 +570,16 @@ class orc_writer_options {
void set_compression(compression_type comp) { _compression = comp; }

/**
* @brief Enable/Disable writing column statistics.
* @brief Choose granularity of statistics collection.
*
* @param val Boolean value to enable/disable statistics.
* The granularity can be set to:
* - cudf::io::STATISTICS_NONE: No statistics are collected.
* - cudf::io::ORC_STATISTICS_STRIPE: Statistics are collected for each ORC stripe.
* - cudf::io::ORC_STATISTICS_ROWGROUP: Statistics are collected for each ORC row group.
*
* @param val Frequency of statistics collection.
*/
void enable_statistics(bool val) { _enable_statistics = val; }
void enable_statistics(statistics_freq val) { _stats_freq = val; }

/**
* @brief Sets the maximum stripe size, in bytes.
Expand Down Expand Up @@ -647,14 +672,19 @@ class orc_writer_options_builder {
}

/**
* @brief Enable/Disable writing column statistics.
* @brief Choose granularity of column statistics to be written
*
* The granularity can be set to:
* - cudf::io::STATISTICS_NONE: No statistics are collected.
* - cudf::io::ORC_STATISTICS_STRIPE: Statistics are collected for each ORC stripe.
* - cudf::io::ORC_STATISTICS_ROWGROUP: Statistics are collected for each ORC row group.
*
* @param val Boolean value to enable/disable.
* @param val Level of statistics collection.
* @return this for chaining.
*/
orc_writer_options_builder& enable_statistics(bool val)
orc_writer_options_builder& enable_statistics(statistics_freq val)
{
options._enable_statistics = val;
options._stats_freq = val;
return *this;
}

Expand Down Expand Up @@ -775,8 +805,8 @@ class chunked_orc_writer_options {
sink_info _sink;
// Specify the compression format to use
compression_type _compression = compression_type::AUTO;
// Enable writing column statistics
bool _enable_statistics = true;
// Specify granularity of statistics collection
statistics_freq _stats_freq = ORC_STATISTICS_ROW_GROUP;
// Maximum size of each stripe (unless smaller than a single row group)
size_t _stripe_size_bytes = default_stripe_size_bytes;
// Maximum number of rows in stripe (unless smaller than a single row group)
Expand Down Expand Up @@ -825,9 +855,9 @@ class chunked_orc_writer_options {
[[nodiscard]] compression_type get_compression() const { return _compression; }

/**
* @brief Whether writing column statistics is enabled/disabled.
* @brief Returns granularity of statistics collection.
*/
[[nodiscard]] bool is_enabled_statistics() const { return _enable_statistics; }
[[nodiscard]] statistics_freq get_statistics_freq() const { return _stats_freq; }

/**
* @brief Returns maximum stripe size, in bytes.
Expand Down Expand Up @@ -871,11 +901,16 @@ class chunked_orc_writer_options {
void set_compression(compression_type comp) { _compression = comp; }

/**
* @brief Enable/Disable writing column statistics.
* @brief Choose granularity of statistics collection
*
* The granularity can be set to:
* - cudf::io::STATISTICS_NONE: No statistics are collected.
* - cudf::io::ORC_STATISTICS_STRIPE: Statistics are collected for each ORC stripe.
* - cudf::io::ORC_STATISTICS_ROWGROUP: Statistics are collected for each ORC row group.
*
* @param val Boolean value to enable/disable.
* @param val Frequency of statistics collection.
*/
void enable_statistics(bool val) { _enable_statistics = val; }
void enable_statistics(statistics_freq val) { _stats_freq = val; }

/**
* @brief Sets the maximum stripe size, in bytes.
Expand Down Expand Up @@ -958,14 +993,19 @@ class chunked_orc_writer_options_builder {
}

/**
* @brief Enable/Disable writing column statistics.
* @brief Choose granularity of statistics collection
*
* The granularity can be set to:
* - cudf::io::STATISTICS_NONE: No statistics are collected.
* - cudf::io::ORC_STATISTICS_STRIPE: Statistics are collected for each ORC stripe.
* - cudf::io::ORC_STATISTICS_ROWGROUP: Statistics are collected for each ORC row group.
*
* @param val Boolean value to enable/disable.
* @param val Frequency of statistics collection.
* @return this for chaining.
*/
chunked_orc_writer_options_builder& enable_statistics(bool val)
chunked_orc_writer_options_builder& enable_statistics(statistics_freq val)
{
options._enable_statistics = val;
options._stats_freq = val;
return *this;
}

Expand Down
44 changes: 27 additions & 17 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1063,15 +1063,15 @@ void set_stat_desc_leaf_cols(device_span<orc_column_device_view const> columns,
}

writer::impl::encoded_statistics writer::impl::gather_statistic_blobs(
bool are_statistics_enabled,
statistics_freq stats_freq,
orc_table_view const& orc_table,
file_segmentation const& segmentation)
{
auto const num_rowgroup_blobs = segmentation.rowgroups.count();
auto const num_stripe_blobs = segmentation.num_stripes() * orc_table.num_columns();
auto const num_file_blobs = orc_table.num_columns();
auto const num_stat_blobs = num_rowgroup_blobs + num_stripe_blobs + num_file_blobs;

auto const num_rowgroup_blobs = segmentation.rowgroups.count();
auto const num_stripe_blobs = segmentation.num_stripes() * orc_table.num_columns();
auto const num_file_blobs = orc_table.num_columns();
auto const num_stat_blobs = num_rowgroup_blobs + num_stripe_blobs + num_file_blobs;
auto const are_statistics_enabled = stats_freq != statistics_freq::STATISTICS_NONE;
if (not are_statistics_enabled or num_stat_blobs == 0) { return {}; }

hostdevice_vector<stats_column_desc> stat_desc(orc_table.num_columns(), stream);
Expand Down Expand Up @@ -1164,17 +1164,27 @@ writer::impl::encoded_statistics writer::impl::gather_statistic_blobs(

hostdevice_vector<uint8_t> blobs(
stat_merge[num_stat_blobs - 1].start_chunk + stat_merge[num_stat_blobs - 1].num_chunks, stream);
gpu::orc_encode_statistics(
blobs.device_ptr(), stat_merge.device_ptr(), stat_chunks.data(), num_stat_blobs, stream);
// Skip rowgroup blobs when encoding, if chosen granularity is coarser than "ROW_GROUP".
auto const is_granularity_rowgroup = stats_freq == ORC_STATISTICS_ROW_GROUP;
auto const num_skip = is_granularity_rowgroup ? 0 : num_rowgroup_blobs;
gpu::orc_encode_statistics(blobs.device_ptr(),
stat_merge.device_ptr(num_skip),
stat_chunks.data() + num_skip,
num_stat_blobs - num_skip,
stream);
stat_merge.device_to_host(stream);
blobs.device_to_host(stream, true);

std::vector<ColStatsBlob> rowgroup_blobs(num_rowgroup_blobs);
for (size_t i = 0; i < num_rowgroup_blobs; i++) {
auto const stat_begin = blobs.host_ptr(rowgroup_stat_merge[i].start_chunk);
auto const stat_end = stat_begin + rowgroup_stat_merge[i].num_chunks;
rowgroup_blobs[i].assign(stat_begin, stat_end);
}
auto rowgroup_blobs = [&]() -> std::vector<ColStatsBlob> {
if (not is_granularity_rowgroup) { return {}; }
std::vector<ColStatsBlob> rowgroup_blobs(num_rowgroup_blobs);
for (size_t i = 0; i < num_rowgroup_blobs; i++) {
auto const stat_begin = blobs.host_ptr(rowgroup_stat_merge[i].start_chunk);
auto const stat_end = stat_begin + rowgroup_stat_merge[i].num_chunks;
rowgroup_blobs[i].assign(stat_begin, stat_end);
}
return rowgroup_blobs;
}();

std::vector<ColStatsBlob> stripe_blobs(num_stripe_blobs);
for (size_t i = 0; i < num_stripe_blobs; i++) {
Expand Down Expand Up @@ -1351,7 +1361,7 @@ writer::impl::impl(std::unique_ptr<data_sink> sink,
max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()},
row_index_stride{options.get_row_index_stride()},
compression_kind_(to_orc_compression(options.get_compression())),
enable_statistics_(options.is_enabled_statistics()),
stats_freq_(options.get_statistics_freq()),
single_write_mode(mode == SingleWriteMode::YES),
kv_meta(options.get_key_value_metadata()),
out_sink_(std::move(sink))
Expand All @@ -1372,7 +1382,7 @@ writer::impl::impl(std::unique_ptr<data_sink> sink,
max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()},
row_index_stride{options.get_row_index_stride()},
compression_kind_(to_orc_compression(options.get_compression())),
enable_statistics_(options.is_enabled_statistics()),
stats_freq_(options.get_statistics_freq()),
single_write_mode(mode == SingleWriteMode::YES),
kv_meta(options.get_key_value_metadata()),
out_sink_(std::move(sink))
Expand Down Expand Up @@ -1954,7 +1964,7 @@ void writer::impl::write(table_view const& table)

ProtobufWriter pbw_(&buffer_);

auto const statistics = gather_statistic_blobs(enable_statistics_, orc_table, segmentation);
auto const statistics = gather_statistic_blobs(stats_freq_, orc_table, segmentation);

// Write stripes
std::vector<std::future<void>> write_tasks;
Expand Down
Loading

0 comments on commit e6a3ffd

Please sign in to comment.