Skip to content

Commit

Permalink
apacheGH-43953: [C++] Add tests based on random data and benchmarks t…
Browse files Browse the repository at this point in the history
…o ChunkResolver::ResolveMany (apache#43954)

### Rationale for this change

Improve tests and add benchmarks. I wrote the tests and benchmarks while trying to improve the performance of `ResolveMany` and failing at it.

### What changes are included in this PR?

Tests, benchmarks, and changes that don't really affect performance but might unlock more optimization opportunities in the future.

### Are these changes tested?

Yes.

* GitHub Issue: apache#43953

Lead-authored-by: Felipe Oliveira Carvalho <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
felipecrv and pitrou authored Sep 24, 2024
1 parent 508eb2f commit 83f35de
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 74 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,7 @@ add_arrow_test(sparse_tensor_test)
add_arrow_test(stl_test SOURCES stl_iterator_test.cc stl_test.cc)

add_arrow_benchmark(builder_benchmark)
add_arrow_benchmark(chunk_resolver_benchmark)
add_arrow_benchmark(compare_benchmark)
add_arrow_benchmark(memory_pool_benchmark)
add_arrow_benchmark(type_benchmark)
Expand Down
104 changes: 59 additions & 45 deletions cpp/src/arrow/chunk_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,43 +55,57 @@ inline std::vector<int64_t> MakeChunksOffsets(const std::vector<T>& chunks) {
return offsets;
}

template <typename IndexType>
inline TypedChunkLocation<IndexType> ResolveOneInline(uint32_t num_offsets,
const uint64_t* offsets,
IndexType typed_logical_index,
int32_t num_chunks,
int32_t chunk_hint) {
const auto index = static_cast<uint64_t>(typed_logical_index);
// use or update chunk_hint
if (index >= offsets[chunk_hint] &&
(chunk_hint == num_chunks || index < offsets[chunk_hint + 1])) {
// hint is correct!
} else {
// lo < hi is guaranteed by `num_offsets = chunks.size() + 1`
auto chunk_index =
ChunkResolver::Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets);
chunk_hint = static_cast<int32_t>(chunk_index);
}
// chunk_index is in [0, chunks.size()] no matter what the value
// of logical_index is, so it's always safe to dereference offsets
// as it contains chunks.size()+1 values.
auto loc = TypedChunkLocation<IndexType>(
/*chunk_index=*/chunk_hint,
/*index_in_chunk=*/typed_logical_index -
static_cast<IndexType>(offsets[chunk_hint]));
#if defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER)
// Make it more likely that Valgrind/ASAN can catch an invalid memory
// access by poisoning the index-in-chunk value when the logical
// index is out-of-bounds.
if (static_cast<int32_t>(loc.chunk_index) == num_chunks) {
loc.index_in_chunk = std::numeric_limits<IndexType>::max();
}
#endif
return loc;
}

/// \pre all the pre-conditions of ChunkResolver::ResolveMany()
/// \pre num_offsets - 1 <= std::numeric_limits<IndexType>::max()
template <typename IndexType>
void ResolveManyInline(size_t num_offsets, const int64_t* signed_offsets,
void ResolveManyInline(uint32_t num_offsets, const int64_t* signed_offsets,
int64_t n_indices, const IndexType* logical_index_vec,
TypedChunkLocation<IndexType>* out_chunk_location_vec,
IndexType chunk_hint) {
int32_t chunk_hint) {
auto* offsets = reinterpret_cast<const uint64_t*>(signed_offsets);
const auto num_chunks = static_cast<IndexType>(num_offsets - 1);
const auto num_chunks = static_cast<int32_t>(num_offsets - 1);
// chunk_hint in [0, num_offsets) per the precondition.
for (int64_t i = 0; i < n_indices; i++) {
auto typed_logical_index = logical_index_vec[i];
const auto index = static_cast<uint64_t>(typed_logical_index);
// use or update chunk_hint
if (index >= offsets[chunk_hint] &&
(chunk_hint == num_chunks || index < offsets[chunk_hint + 1])) {
// hint is correct!
} else {
// lo < hi is guaranteed by `num_offsets = chunks.size() + 1`
auto chunk_index =
ChunkResolver::Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets);
chunk_hint = static_cast<IndexType>(chunk_index);
}
out_chunk_location_vec[i].chunk_index = chunk_hint;
// chunk_index is in [0, chunks.size()] no matter what the
// value of logical_index is, so it's always safe to dereference
// offset_ as it contains chunks.size()+1 values.
out_chunk_location_vec[i].index_in_chunk =
typed_logical_index - static_cast<IndexType>(offsets[chunk_hint]);
#if defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER)
// Make it more likely that Valgrind/ASAN can catch an invalid memory
// access by poisoning the index-in-chunk value when the logical
// index is out-of-bounds.
if (chunk_hint == num_chunks) {
out_chunk_location_vec[i].index_in_chunk = std::numeric_limits<IndexType>::max();
}
#endif
const auto typed_logical_index = logical_index_vec[i];
const auto loc = ResolveOneInline(num_offsets, offsets, typed_logical_index,
num_chunks, chunk_hint);
out_chunk_location_vec[i] = loc;
chunk_hint = static_cast<int32_t>(loc.chunk_index);
}
}

Expand Down Expand Up @@ -127,30 +141,30 @@ ChunkResolver& ChunkResolver::operator=(const ChunkResolver& other) noexcept {

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint8_t* logical_index_vec,
TypedChunkLocation<uint8_t>* out_chunk_location_vec,
uint8_t chunk_hint) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_location_vec, chunk_hint);
}

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint32_t* logical_index_vec,
TypedChunkLocation<uint32_t>* out_chunk_location_vec,
uint32_t chunk_hint) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_location_vec, chunk_hint);
int32_t chunk_hint) const {
ResolveManyInline(static_cast<uint32_t>(offsets_.size()), offsets_.data(), n_indices,
logical_index_vec, out_chunk_location_vec, chunk_hint);
}

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint16_t* logical_index_vec,
TypedChunkLocation<uint16_t>* out_chunk_location_vec,
uint16_t chunk_hint) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_location_vec, chunk_hint);
int32_t chunk_hint) const {
ResolveManyInline(static_cast<uint32_t>(offsets_.size()), offsets_.data(), n_indices,
logical_index_vec, out_chunk_location_vec, chunk_hint);
}

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint32_t* logical_index_vec,
TypedChunkLocation<uint32_t>* out_chunk_location_vec,
int32_t chunk_hint) const {
ResolveManyInline(static_cast<uint32_t>(offsets_.size()), offsets_.data(), n_indices,
logical_index_vec, out_chunk_location_vec, chunk_hint);
}

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint64_t* logical_index_vec,
TypedChunkLocation<uint64_t>* out_chunk_location_vec,
uint64_t chunk_hint) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_location_vec, chunk_hint);
int32_t chunk_hint) const {
ResolveManyInline(static_cast<uint32_t>(offsets_.size()), offsets_.data(), n_indices,
logical_index_vec, out_chunk_location_vec, chunk_hint);
}

} // namespace arrow::internal
57 changes: 30 additions & 27 deletions cpp/src/arrow/chunk_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ struct ARROW_EXPORT ChunkResolver {
/// \brief Cache of the index of the last resolved chunk.
///
/// \invariant `cached_chunk_ in [0, chunks.size()]`
mutable std::atomic<int64_t> cached_chunk_;
mutable std::atomic<int32_t> cached_chunk_;

public:
explicit ChunkResolver(const ArrayVector& chunks) noexcept;
Expand All @@ -92,6 +92,8 @@ struct ARROW_EXPORT ChunkResolver {
for (size_t i = 1; i < offsets_.size(); i++) {
assert(offsets_[i] >= offsets_[i - 1]);
}
assert(offsets_.size() - 1 <=
static_cast<size_t>(std::numeric_limits<int32_t>::max()));
#endif
}

Expand All @@ -102,7 +104,7 @@ struct ARROW_EXPORT ChunkResolver {
ChunkResolver& operator=(const ChunkResolver& other) noexcept;

int64_t logical_array_length() const { return offsets_.back(); }
int64_t num_chunks() const { return static_cast<int64_t>(offsets_.size()) - 1; }
int32_t num_chunks() const { return static_cast<int32_t>(offsets_.size() - 1); }

int64_t chunk_length(int64_t chunk_index) const {
return offsets_[chunk_index + 1] - offsets_[chunk_index];
Expand Down Expand Up @@ -140,9 +142,9 @@ struct ARROW_EXPORT ChunkResolver {
/// bounds, or with chunk_index == chunks.size() if logical index is
/// `>= chunked_array.length()`.
inline ChunkLocation ResolveWithHint(int64_t index, ChunkLocation hint) const {
assert(hint.chunk_index < static_cast<int64_t>(offsets_.size()));
const auto chunk_index =
ResolveChunkIndex</*StoreCachedChunk=*/false>(index, hint.chunk_index);
assert(hint.chunk_index < static_cast<uint32_t>(offsets_.size()));
const auto chunk_index = ResolveChunkIndex</*StoreCachedChunk=*/false>(
index, static_cast<int32_t>(hint.chunk_index));
return ChunkLocation{chunk_index, index - offsets_[chunk_index]};
}

Expand All @@ -169,13 +171,12 @@ struct ARROW_EXPORT ChunkResolver {
[[nodiscard]] bool ResolveMany(int64_t n_indices, const IndexType* logical_index_vec,
TypedChunkLocation<IndexType>* out_chunk_location_vec,
IndexType chunk_hint = 0) const {
if constexpr (sizeof(IndexType) < sizeof(uint64_t)) {
if constexpr (sizeof(IndexType) < sizeof(uint32_t)) {
// The max value returned by Bisect is `offsets.size() - 1` (= chunks.size()).
constexpr uint64_t kMaxIndexTypeValue = std::numeric_limits<IndexType>::max();
constexpr int64_t kMaxIndexTypeValue = std::numeric_limits<IndexType>::max();
// A ChunkedArray with enough empty chunks can make the index of a chunk
// exceed the logical index and thus the maximum value of IndexType.
const bool chunk_index_fits_on_type =
static_cast<uint64_t>(offsets_.size() - 1) <= kMaxIndexTypeValue;
const bool chunk_index_fits_on_type = num_chunks() <= kMaxIndexTypeValue;
if (ARROW_PREDICT_FALSE(!chunk_index_fits_on_type)) {
return false;
}
Expand All @@ -194,34 +195,36 @@ struct ARROW_EXPORT ChunkResolver {
using U = std::make_unsigned_t<IndexType>;
ResolveManyImpl(n_indices, reinterpret_cast<const U*>(logical_index_vec),
reinterpret_cast<TypedChunkLocation<U>*>(out_chunk_location_vec),
static_cast<U>(chunk_hint));
static_cast<int32_t>(chunk_hint));
} else {
static_assert(std::is_unsigned_v<IndexType>);
ResolveManyImpl(n_indices, logical_index_vec, out_chunk_location_vec, chunk_hint);
ResolveManyImpl(n_indices, logical_index_vec, out_chunk_location_vec,
static_cast<int32_t>(chunk_hint));
}
return true;
}

private:
template <bool StoreCachedChunk>
inline int64_t ResolveChunkIndex(int64_t index, int64_t cached_chunk) const {
inline int64_t ResolveChunkIndex(int64_t index, int32_t cached_chunk) const {
// It is common for algorithms sequentially processing arrays to make consecutive
// accesses at a relatively small distance from each other, hence often falling in the
// same chunk.
//
// This is guaranteed when merging (assuming each side of the merge uses its
// own resolver), and is the most common case in recursive invocations of
// partitioning.
const auto num_offsets = static_cast<int64_t>(offsets_.size());
const auto num_offsets = static_cast<uint32_t>(offsets_.size());
const int64_t* offsets = offsets_.data();
if (ARROW_PREDICT_TRUE(index >= offsets[cached_chunk]) &&
(cached_chunk + 1 == num_offsets || index < offsets[cached_chunk + 1])) {
(static_cast<uint32_t>(cached_chunk + 1) == num_offsets ||
index < offsets[cached_chunk + 1])) {
return cached_chunk;
}
// lo < hi is guaranteed by `num_offsets = chunks.size() + 1`
const auto chunk_index = Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets);
if constexpr (StoreCachedChunk) {
assert(chunk_index < static_cast<int64_t>(offsets_.size()));
assert(static_cast<uint32_t>(chunk_index) < static_cast<uint32_t>(offsets_.size()));
cached_chunk_.store(chunk_index, std::memory_order_relaxed);
}
return chunk_index;
Expand All @@ -230,13 +233,13 @@ struct ARROW_EXPORT ChunkResolver {
/// \pre all the pre-conditions of ChunkResolver::ResolveMany()
/// \pre num_offsets - 1 <= std::numeric_limits<IndexType>::max()
void ResolveManyImpl(int64_t, const uint8_t*, TypedChunkLocation<uint8_t>*,
uint8_t) const;
int32_t) const;
void ResolveManyImpl(int64_t, const uint16_t*, TypedChunkLocation<uint16_t>*,
uint16_t) const;
int32_t) const;
void ResolveManyImpl(int64_t, const uint32_t*, TypedChunkLocation<uint32_t>*,
uint32_t) const;
int32_t) const;
void ResolveManyImpl(int64_t, const uint64_t*, TypedChunkLocation<uint64_t>*,
uint64_t) const;
int32_t) const;

public:
/// \brief Find the index of the chunk that contains the logical index.
Expand All @@ -249,24 +252,24 @@ struct ARROW_EXPORT ChunkResolver {
/// \pre index >= 0 (otherwise, when index is negative, hi-1 is returned)
/// \pre lo < hi
/// \pre lo >= 0 && hi <= offsets_.size()
static inline int64_t Bisect(int64_t index, const int64_t* offsets, int64_t lo,
int64_t hi) {
static inline int32_t Bisect(int64_t index, const int64_t* offsets, int32_t lo,
int32_t hi) {
return Bisect(static_cast<uint64_t>(index),
reinterpret_cast<const uint64_t*>(offsets), static_cast<uint64_t>(lo),
static_cast<uint64_t>(hi));
reinterpret_cast<const uint64_t*>(offsets), static_cast<uint32_t>(lo),
static_cast<uint32_t>(hi));
}

static inline int64_t Bisect(uint64_t index, const uint64_t* offsets, uint64_t lo,
uint64_t hi) {
static inline int32_t Bisect(uint64_t index, const uint64_t* offsets, uint32_t lo,
uint32_t hi) {
// Similar to std::upper_bound(), but slightly different as our offsets
// array always starts with 0.
auto n = hi - lo;
// First iteration does not need to check for n > 1
// (lo < hi is guaranteed by the precondition).
assert(n > 1 && "lo < hi is a precondition of Bisect");
do {
const uint64_t m = n >> 1;
const uint64_t mid = lo + m;
const uint32_t m = n >> 1;
const uint32_t mid = lo + m;
if (index >= offsets[mid]) {
lo = mid;
n -= m;
Expand Down
Loading

0 comments on commit 83f35de

Please sign in to comment.