Skip to content

Commit

Permalink
Billion row challenge speedup (#1584)
Browse files Browse the repository at this point in the history
After establishing that deallocating segments was a bottleneck when
scaling the billion row challenge out to many cores, we've decided to
move to using [mimalloc
everywhere](#1577).
Using `LD_PRELOAD` with mimalloc, these optimisations further speed up
running the billion row challenge (run on a 64 core machine with
hyperthreading):
```
Cores master brc-speedup
1     76.47  61.39
2     40.10  33.79
4     18.70  16.58
8     10.11   8.68
16     6.83   6.44
32     4.78   5.17
64     5.41   5.15
```
This shows that scaling is good out to 8 cores, and drops off after
that. Logging timings shows an obvious bottleneck in `gather_entities`
within `AggregationClause::process`, which will be addressed in a
[future ticket](#1586) to
avoid conflicts with #1495.
  • Loading branch information
alexowens90 authored May 31, 2024
1 parent 81ee498 commit d601836
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 293 deletions.
2 changes: 1 addition & 1 deletion cpp/arcticdb/column_store/memory_segment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ class SegmentInMemory {
return SegmentInMemory(impl_->truncate(start_row, end_row, reconstruct_string_pool));
}

std::vector<SegmentInMemory> partition(const std::vector<std::optional<uint8_t>>& row_to_segment,
std::vector<SegmentInMemory> partition(const std::vector<uint8_t>& row_to_segment,
const std::vector<uint64_t>& segment_counts) const{
std::vector<SegmentInMemory> res;
auto impls = impl_->partition(row_to_segment, segment_counts);
Expand Down
28 changes: 11 additions & 17 deletions cpp/arcticdb/column_store/memory_segment_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ std::shared_ptr<SegmentInMemoryImpl> SegmentInMemoryImpl::get_output_segment(siz
return output;
}

std::vector<std::shared_ptr<SegmentInMemoryImpl>> SegmentInMemoryImpl::partition(const std::vector<std::optional<uint8_t>>& row_to_segment,
std::vector<std::shared_ptr<SegmentInMemoryImpl>> SegmentInMemoryImpl::partition(const std::vector<uint8_t>& row_to_segment,
const std::vector<uint64_t>& segment_counts) const {
schema::check<ErrorCode::E_UNSUPPORTED_COLUMN_TYPE>(!is_sparse(),
"SegmentInMemory::partition not supported with sparse columns");
Expand All @@ -374,28 +374,22 @@ std::vector<std::shared_ptr<SegmentInMemoryImpl>> SegmentInMemoryImpl::partition
}

for(const auto& column : folly::enumerate(columns())) {
(*column)->type().visit_tag([&] (auto type_desc_tag){
using TypeDescriptorTag = decltype(type_desc_tag);
using ColumnTagType = typename TypeDescriptorTag::DataTypeTag;
using RawType = typename ColumnTagType::raw_type;
details::visit_type((*column)->type().data_type(), [&](auto col_tag) {
using type_info = ScalarTypeInfo<decltype(col_tag)>;

auto output_col_idx = column.index;
std::vector<RawType*> output_ptrs{output.size(), nullptr};
std::vector<typename type_info::RawType*> output_ptrs{output.size(), nullptr};
for (const auto& segment: folly::enumerate(output)) {
if (static_cast<bool>(*segment)) {
output_ptrs.at(segment.index) = reinterpret_cast<RawType*>((*segment)->column(output_col_idx).ptr());
output_ptrs.at(segment.index) = reinterpret_cast<typename type_info::RawType*>((*segment)->column(column.index).ptr());
}
}

auto input_data = (*column)->data();
size_t overall_idx = 0;
while(auto block = input_data.next<TypeDescriptorTag>()) {
auto input_ptr = reinterpret_cast<const RawType*>(block.value().data());
for (size_t block_idx = 0; block_idx < block.value().row_count(); ++block_idx, ++input_ptr, ++overall_idx) {
auto opt_output_segment_idx = row_to_segment[overall_idx];
if (opt_output_segment_idx.has_value()) {
*(output_ptrs[*opt_output_segment_idx]++) = *input_ptr;
}
auto row_to_segment_it = row_to_segment.cbegin();
auto input_data = (*column)->data();
auto cend = input_data.cend<typename type_info::TDT>();
for (auto input_it = input_data.cbegin<typename type_info::TDT>(); input_it != cend; ++input_it, ++row_to_segment_it) {
if (ARCTICDB_LIKELY(*row_to_segment_it != std::numeric_limits<uint8_t>::max())) {
*(output_ptrs[*row_to_segment_it]++) = *input_it;
}
}
});
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/column_store/memory_segment_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ class SegmentInMemoryImpl {
// by the row_to_segment vector (std::nullopt means the row is not included in any output segment).
// segment_counts is the length of the number of output segments, and should be greater than or equal to the max value
// in row_to_segment
std::vector<std::shared_ptr<SegmentInMemoryImpl>> partition(const std::vector<std::optional<uint8_t>>& row_to_segment,
std::vector<std::shared_ptr<SegmentInMemoryImpl>> partition(const std::vector<uint8_t>& row_to_segment,
const std::vector<uint64_t>& segment_counts) const;

std::vector<std::shared_ptr<SegmentInMemoryImpl>> split(size_t rows) const;
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/pipeline/filter_segment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ inline SegmentInMemory filter_segment(const SegmentInMemory& input,
}

inline std::vector<SegmentInMemory> partition_segment(const SegmentInMemory& input,
const std::vector<std::optional<uint8_t>>& row_to_segment,
const std::vector<uint8_t>& row_to_segment,
const std::vector<uint64_t>& segment_counts) {
return input.partition(row_to_segment, segment_counts);
}
Expand Down
Loading

0 comments on commit d601836

Please sign in to comment.