Skip to content

Commit

Permalink
[C++][Parquet] Improve performance of generating size statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Jan 8, 2025
1 parent 0804ba6 commit 8a4ffbc
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 36 deletions.
53 changes: 23 additions & 30 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1468,42 +1468,43 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
// which case we call back to the dense write path)
std::shared_ptr<::arrow::Array> preserved_dictionary_;

int64_t WriteLevels(int64_t num_values, const int16_t* def_levels,
int64_t WriteLevels(int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels) {
// Update histograms now, to maximize cache efficiency.
UpdateLevelHistogram(num_levels, def_levels, rep_levels);

int64_t values_to_write = 0;
// If the field is required and non-repeated, there are no definition levels
if (descr_->max_definition_level() > 0) {
for (int64_t i = 0; i < num_values; ++i) {
for (int64_t i = 0; i < num_levels; ++i) {
if (def_levels[i] == descr_->max_definition_level()) {
++values_to_write;
}
}

WriteDefinitionLevels(num_values, def_levels);
WriteDefinitionLevels(num_levels, def_levels);
} else {
// Required field, write all values
values_to_write = num_values;
values_to_write = num_levels;
}

// Not present for non-repeated fields
if (descr_->max_repetition_level() > 0) {
// A row could include more than one value
// Count the occasions where we start a new row
for (int64_t i = 0; i < num_values; ++i) {
for (int64_t i = 0; i < num_levels; ++i) {
if (rep_levels[i] == 0) {
rows_written_++;
num_buffered_rows_++;
}
}

WriteRepetitionLevels(num_values, rep_levels);
WriteRepetitionLevels(num_levels, rep_levels);
} else {
// Each value is exactly one row
rows_written_ += num_values;
num_buffered_rows_ += num_values;
rows_written_ += num_levels;
num_buffered_rows_ += num_levels;
}

UpdateLevelHistogram(num_values, def_levels, rep_levels);
return values_to_write;
}

Expand Down Expand Up @@ -1575,6 +1576,9 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<

void WriteLevelsSpaced(int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels) {
// Update histograms now, to maximize cache efficiency.
UpdateLevelHistogram(num_levels, def_levels, rep_levels);

// If the field is required and non-repeated, there are no definition levels
if (descr_->max_definition_level() > 0) {
WriteDefinitionLevels(num_levels, def_levels);
Expand All @@ -1595,8 +1599,6 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
rows_written_ += num_levels;
num_buffered_rows_ += num_levels;
}

UpdateLevelHistogram(num_levels, def_levels, rep_levels);
}

void UpdateLevelHistogram(int64_t num_levels, const int16_t* def_levels,
Expand All @@ -1606,26 +1608,17 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
}

auto add_levels = [](std::vector<int64_t>& level_histogram,
::arrow::util::span<const int16_t> levels) {
for (int16_t level : levels) {
ARROW_DCHECK_LT(level, static_cast<int16_t>(level_histogram.size()));
++level_histogram[level];
}
::arrow::util::span<const int16_t> levels, int16_t max_level) {
ARROW_DCHECK_EQ(static_cast<size_t>(max_level) + 1, level_histogram.size());
::parquet::UpdateLevelHistogram(levels, level_histogram);
};

if (descr_->max_definition_level() > 0) {
add_levels(page_size_statistics_->definition_level_histogram,
{def_levels, static_cast<size_t>(num_levels)});
} else {
page_size_statistics_->definition_level_histogram[0] += num_levels;
}

if (descr_->max_repetition_level() > 0) {
add_levels(page_size_statistics_->repetition_level_histogram,
{rep_levels, static_cast<size_t>(num_levels)});
} else {
page_size_statistics_->repetition_level_histogram[0] += num_levels;
}
add_levels(page_size_statistics_->definition_level_histogram,
{def_levels, static_cast<size_t>(num_levels)},
descr_->max_definition_level());
add_levels(page_size_statistics_->repetition_level_histogram,
{rep_levels, static_cast<size_t>(num_levels)},
descr_->max_repetition_level());
}

// Update the unencoded data bytes for ByteArray only per the specification.
Expand Down
70 changes: 64 additions & 6 deletions cpp/src/parquet/size_statistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@

namespace parquet {

namespace {

void MergeLevelHistogram(::arrow::util::span<int64_t> histogram,
::arrow::util::span<const int64_t> other) {
ARROW_DCHECK_EQ(histogram.size(), other.size());
std::transform(histogram.begin(), histogram.end(), other.begin(), histogram.begin(),
std::plus<>());
}

} // namespace

void SizeStatistics::Merge(const SizeStatistics& other) {
if (repetition_level_histogram.size() != other.repetition_level_histogram.size()) {
throw ParquetException("Repetition level histogram size mismatch");
Expand All @@ -36,12 +47,8 @@ void SizeStatistics::Merge(const SizeStatistics& other) {
other.unencoded_byte_array_data_bytes.has_value()) {
throw ParquetException("Unencoded byte array data bytes are not consistent");
}
std::transform(repetition_level_histogram.begin(), repetition_level_histogram.end(),
other.repetition_level_histogram.begin(),
repetition_level_histogram.begin(), std::plus<>());
std::transform(definition_level_histogram.begin(), definition_level_histogram.end(),
other.definition_level_histogram.begin(),
definition_level_histogram.begin(), std::plus<>());
MergeLevelHistogram(repetition_level_histogram, other.repetition_level_histogram);
MergeLevelHistogram(definition_level_histogram, other.definition_level_histogram);
if (unencoded_byte_array_data_bytes.has_value()) {
unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes.value() +
other.unencoded_byte_array_data_bytes.value();
Expand Down Expand Up @@ -91,4 +98,55 @@ std::unique_ptr<SizeStatistics> SizeStatistics::Make(const ColumnDescriptor* des
return size_stats;
}

void UpdateLevelHistogram(::arrow::util::span<const int16_t> levels,
::arrow::util::span<int64_t> histogram) {
const int64_t num_levels = static_cast<int64_t>(levels.size());
const int16_t max_level = static_cast<int16_t>(histogram.size() - 1);
if (max_level == 0) {
histogram[0] += num_levels;
return;
}
// The goal of the two specialized paths below is to accelerate common cases
// by keeping histogram values in registers.
// The fallback implementation (`++histogram[level]`) issues a series of
// load-stores with frequent conflicts.
if (max_level == 1) {
// Specialize the common case for non-repeated non-nested columns
// by keeping histogram values in a register, which avoids being limited
// by CPU cache latency.
int64_t hist0 = 0;
for (int16_t level : levels) {
ARROW_DCHECK_LE(level, max_level);
hist0 += (level == 0);
}
// max_level is usually the most frequent case, update it in one single step
histogram[1] += num_levels - hist0;
histogram[0] += hist0;
return;
}

// The general case cannot avoid repeated loads/stores in the CPU cache,
// but it limits store-to-load dependencies by interleaving partial histogram
// updates.
constexpr int kUnroll = 4;
std::array<std::vector<int64_t>, kUnroll> partial_hist;
for (auto& hist : partial_hist) {
hist.assign(histogram.size(), 0);
}
int64_t i = 0;
for (; i <= num_levels - kUnroll; i += kUnroll) {
for (int j = 0; j < kUnroll; ++j) {
ARROW_DCHECK_LE(levels[i + j], max_level);
++partial_hist[j][levels[i + j]];
}
}
for (; i < num_levels; ++i) {
ARROW_DCHECK_LE(levels[i], max_level);
++partial_hist[0][levels[i]];
}
for (const auto& hist : partial_hist) {
MergeLevelHistogram(histogram, hist);
}
}

} // namespace parquet
6 changes: 6 additions & 0 deletions cpp/src/parquet/size_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

#pragma once

#include <cstdint>
#include <optional>
#include <vector>

#include "arrow/util/span.h"
#include "parquet/platform.h"
#include "parquet/type_fwd.h"

Expand Down Expand Up @@ -89,4 +91,8 @@ struct PARQUET_EXPORT SizeStatistics {
static std::unique_ptr<SizeStatistics> Make(const ColumnDescriptor* descr);
};

PARQUET_EXPORT
void UpdateLevelHistogram(::arrow::util::span<const int16_t> levels,
::arrow::util::span<int64_t> histogram);

} // namespace parquet
2 changes: 2 additions & 0 deletions cpp/src/parquet/size_statistics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,4 +276,6 @@ TEST_F(SizeStatisticsRoundTripTest, WriteDictionaryArray) {
/*byte_array_bytes=*/{4}}));
}

// TODO add tests for UpdateLevelHistogram

} // namespace parquet

0 comments on commit 8a4ffbc

Please sign in to comment.