Skip to content

Commit

Permalink
IO statistics cleanup (#8191)
Browse files Browse the repository at this point in the history
Addresses #6920 

Use type dispatched functors to calculate statistics in Parquet and ORC.

Authors:
  - Kumar Aatish (https://github.com/kaatish)

Approvers:
  - Robert Maynard (https://github.com/robertmaynard)
  - Devavret Makkar (https://github.com/devavret)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Ashwin Srinath (https://github.com/shwina)
  - Michael Wang (https://github.com/isVoid)

URL: #8191
  • Loading branch information
kaatish authored May 26, 2021
1 parent ddba88d commit 24e05a0
Show file tree
Hide file tree
Showing 16 changed files with 1,210 additions and 629 deletions.
3 changes: 2 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ add_library(cudf
src/io/parquet/parquet.cpp
src/io/parquet/reader_impl.cu
src/io/parquet/writer_impl.cu
src/io/statistics/column_stats.cu
src/io/statistics/orc_column_statistics.cu
src/io/statistics/parquet_column_statistics.cu
src/io/utilities/column_buffer.cpp
src/io/utilities/data_sink.cpp
src/io/utilities/datasource.cpp
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/orc/orc_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
#include "timezone.cuh"

#include <io/comp/gpuinflate.h>
#include <io/statistics/column_stats.h>
#include <cudf/table/table_device_view.cuh>
#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>
#include <io/statistics/statistics.cuh>
#include <io/utilities/column_buffer.hpp>
#include "orc_common.h"

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/orc/stats_enc.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -311,7 +311,7 @@ __global__ void __launch_bounds__(encode_threads_per_block)
// }
if (s->chunk.has_sum) { // Sum is equal to the number of 'true' values
cur[0] = 5 * 8 + PB_TYPE_FIXEDLEN;
cur = pb_put_packed_uint(cur + 2, 1, s->chunk.sum.i_val);
cur = pb_put_packed_uint(cur + 2, 1, s->chunk.sum.u_val);
fld_start[1] = cur - (fld_start + 2);
}
break;
Expand Down
27 changes: 15 additions & 12 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "writer_impl.hpp"

#include <io/statistics/column_statistics.cuh>
#include <io/utilities/column_utils.cuh>

#include <cudf/detail/iterator.cuh>
Expand Down Expand Up @@ -851,18 +852,20 @@ std::vector<std::vector<uint8_t>> writer::impl::gather_statistic_blobs(
row_index_stride_,
stream);

GatherColumnStatistics(stat_chunks.data(), stat_groups.data(), num_chunks, stream);
MergeColumnStatistics(stat_chunks.data() + num_chunks,
stat_chunks.data(),
stat_merge.device_ptr(),
stripe_bounds.size() * columns.size(),
stream);

MergeColumnStatistics(stat_chunks.data() + num_chunks + stripe_bounds.size() * columns.size(),
stat_chunks.data() + num_chunks,
stat_merge.device_ptr(stripe_bounds.size() * columns.size()),
columns.size(),
stream);
detail::calculate_group_statistics<detail::io_file_format::ORC>(
stat_chunks.data(), stat_groups.data(), num_chunks, stream);
detail::merge_group_statistics<detail::io_file_format::ORC>(stat_chunks.data() + num_chunks,
stat_chunks.data(),
stat_merge.device_ptr(),
stripe_bounds.size() * columns.size(),
stream);

detail::merge_group_statistics<detail::io_file_format::ORC>(
stat_chunks.data() + num_chunks + stripe_bounds.size() * columns.size(),
stat_chunks.data() + num_chunks,
stat_merge.device_ptr(stripe_bounds.size() * columns.size()),
columns.size(),
stream);
gpu::orc_init_statistics_buffersize(
stat_merge.device_ptr(), stat_chunks.data() + num_chunks, num_stat_blobs, stream);
stat_merge.device_to_host(stream, true);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

#include "io/comp/gpuinflate.h"
#include "io/parquet/parquet_common.hpp"
#include "io/statistics/column_stats.h"
#include "io/statistics/statistics.cuh"
#include "io/utilities/column_buffer.hpp"
#include "io/utilities/hostdevice_vector.hpp"

Expand Down
17 changes: 10 additions & 7 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* @brief cuDF-IO parquet writer class implementation
*/

#include <io/statistics/column_statistics.cuh>
#include "writer_impl.hpp"

#include <io/utilities/column_utils.cuh>
Expand Down Expand Up @@ -738,7 +739,7 @@ void writer::impl::gather_fragment_statistics(
device_2dspan<statistics_group>(frag_stats_group.data(), num_columns, num_fragments);

gpu::InitFragmentStatistics(frag_stats_group_2dview, frag, col_desc, stream);
GatherColumnStatistics(
detail::calculate_group_statistics<detail::io_file_format::PARQUET>(
frag_stats_chunk.data(), frag_stats_group.data(), num_fragments * num_columns, stream);
stream.synchronize();
}
Expand Down Expand Up @@ -780,13 +781,15 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector<gpu::EncColumnChunk> &
(num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr,
stream);
if (num_stats_bfr > 0) {
MergeColumnStatistics(page_stats, frag_stats, page_stats_mrg.data(), num_pages, stream);
detail::merge_group_statistics<detail::io_file_format::PARQUET>(
page_stats, frag_stats, page_stats_mrg.data(), num_pages, stream);
if (num_stats_bfr > num_pages) {
MergeColumnStatistics(page_stats + num_pages,
page_stats,
page_stats_mrg.data() + num_pages,
num_stats_bfr - num_pages,
stream);
detail::merge_group_statistics<detail::io_file_format::PARQUET>(
page_stats + num_pages,
page_stats,
page_stats_mrg.data() + num_pages,
num_stats_bfr - num_pages,
stream);
}
}
stream.synchronize();
Expand Down
Loading

0 comments on commit 24e05a0

Please sign in to comment.