Skip to content

Commit

Permalink
[Enhancement] add default hash function and dump query resource usage (
Browse files Browse the repository at this point in the history
…#52080)

Signed-off-by: stdpain <[email protected]>
  • Loading branch information
stdpain authored Oct 18, 2024
1 parent 01bf895 commit 22a1f91
Show file tree
Hide file tree
Showing 12 changed files with 246 additions and 224 deletions.
206 changes: 2 additions & 204 deletions be/src/column/column_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <cstddef>
#include <cstdint>
#include <type_traits>

Expand All @@ -31,206 +32,15 @@

#include "column/type_traits.h"
#include "types/logical_type.h"
#include "util/hash_util.hpp"
#include "util/hash.h"
#include "util/slice.h"
#include "util/unaligned_access.h"

#if defined(__aarch64__)
#include "arm_acle.h"
#endif

namespace starrocks {

typedef unsigned __int128 uint128_t;
inline uint64_t umul128(uint64_t a, uint64_t b, uint64_t* high) {
auto result = static_cast<uint128_t>(a) * static_cast<uint128_t>(b);
*high = static_cast<uint64_t>(result >> 64u);
return static_cast<uint64_t>(result);
}

template <int n>
struct phmap_mix {
inline size_t operator()(size_t) const;
};

template <>
class phmap_mix<4> {
public:
inline size_t operator()(size_t a) const {
static constexpr uint64_t kmul = 0xcc9e2d51UL;
uint64_t l = a * kmul;
return static_cast<size_t>(l ^ (l >> 32u));
}
};

template <>
class phmap_mix<8> {
public:
// Very fast mixing (similar to Abseil)
inline size_t operator()(size_t a) const {
static constexpr uint64_t k = 0xde5fb9d2630458e9ULL;
uint64_t h;
uint64_t l = umul128(a, k, &h);
return static_cast<size_t>(h + l);
}
};

enum PhmapSeed { PhmapSeed1, PhmapSeed2 };

template <int n, PhmapSeed seed>
class phmap_mix_with_seed {
public:
inline size_t operator()(size_t) const;
};

template <>
class phmap_mix_with_seed<4, PhmapSeed1> {
public:
inline size_t operator()(size_t a) const {
static constexpr uint64_t kmul = 0xcc9e2d51UL;
uint64_t l = a * kmul;
return static_cast<size_t>(l ^ (l >> 32u));
}
};

template <>
class phmap_mix_with_seed<8, PhmapSeed1> {
public:
inline size_t operator()(size_t a) const {
static constexpr uint64_t k = 0xde5fb9d2630458e9ULL;
uint64_t h;
uint64_t l = umul128(a, k, &h);
return static_cast<size_t>(h + l);
}
};

template <>
class phmap_mix_with_seed<4, PhmapSeed2> {
public:
inline size_t operator()(size_t a) const {
static constexpr uint64_t kmul = 0xcc9e2d511d;
uint64_t l = a * kmul;
return static_cast<size_t>(l ^ (l >> 32u));
}
};

template <>
class phmap_mix_with_seed<8, PhmapSeed2> {
public:
inline size_t operator()(size_t a) const {
static constexpr uint64_t k = 0xde5fb9d263046000ULL;
uint64_t h;
uint64_t l = umul128(a, k, &h);
return static_cast<size_t>(h + l);
}
};

inline uint32_t crc_hash_32(const void* data, int32_t bytes, uint32_t hash) {
#if defined(__x86_64__) && !defined(__SSE4_2__)
return static_cast<uint32_t>(crc32(hash, (const unsigned char*)data, bytes));
#else
uint32_t words = bytes / sizeof(uint32_t);
bytes = bytes % 4 /*sizeof(uint32_t)*/;

auto* p = reinterpret_cast<const uint8_t*>(data);

while (words--) {
#if defined(__x86_64__)
hash = _mm_crc32_u32(hash, unaligned_load<uint32_t>(p));
#elif defined(__aarch64__)
hash = __crc32cw(hash, unaligned_load<uint32_t>(p));
#else
#error "Not supported architecture"
#endif
p += sizeof(uint32_t);
}

while (bytes--) {
#if defined(__x86_64__)
hash = _mm_crc32_u8(hash, *p);
#elif defined(__aarch64__)
hash = __crc32cb(hash, *p);
#else
#error "Not supported architecture"
#endif
++p;
}

// The lower half of the CRC hash has has poor uniformity, so swap the halves
// for anyone who only uses the first several bits of the hash.
hash = (hash << 16u) | (hash >> 16u);
return hash;
#endif
}

inline uint64_t crc_hash_64(const void* data, int32_t length, uint64_t hash) {
#if defined(__x86_64__) && !defined(__SSE4_2__)
return crc32(hash, (const unsigned char*)data, length);
#else
if (UNLIKELY(length < 8)) {
return crc_hash_32(data, length, static_cast<uint32_t>(hash));
}

uint64_t words = length / sizeof(uint64_t);
auto* p = reinterpret_cast<const uint8_t*>(data);
auto* end = reinterpret_cast<const uint8_t*>(data) + length;
while (words--) {
#if defined(__x86_64__) && defined(__SSE4_2__)
hash = _mm_crc32_u64(hash, unaligned_load<uint64_t>(p));
#elif defined(__aarch64__)
hash = __crc32cd(hash, unaligned_load<uint64_t>(p));
#else
#error "Not supported architecture"
#endif
p += sizeof(uint64_t);
}
// Reduce the branch condition
p = end - 8;
#if defined(__x86_64__)
hash = _mm_crc32_u64(hash, unaligned_load<uint64_t>(p));
#elif defined(__aarch64__)
hash = __crc32cd(hash, unaligned_load<uint64_t>(p));
#else
#error "Not supported architecture"
#endif
p += sizeof(uint64_t);
return hash;
#endif
}

// TODO: 0x811C9DC5 is not prime number
static const uint32_t CRC_HASH_SEED1 = 0x811C9DC5;
static const uint32_t CRC_HASH_SEED2 = 0x811C9DD7;

class SliceHash {
public:
std::size_t operator()(const Slice& slice) const {
return crc_hash_64(slice.data, static_cast<int32_t>(slice.size), CRC_HASH_SEED1);
}
};

template <PhmapSeed>
class SliceHashWithSeed {
public:
std::size_t operator()(const Slice& slice) const;
};

template <>
class SliceHashWithSeed<PhmapSeed1> {
public:
std::size_t operator()(const Slice& slice) const {
return crc_hash_64(slice.data, static_cast<int32_t>(slice.size), CRC_HASH_SEED1);
}
};

template <>
class SliceHashWithSeed<PhmapSeed2> {
public:
std::size_t operator()(const Slice& slice) const {
return crc_hash_64(slice.data, static_cast<int32_t>(slice.size), CRC_HASH_SEED2);
}
};

#if defined(__SSE2__) && !defined(ADDRESS_SANITIZER)

// NOTE: This function will access 15 excessive bytes after p1 and p2, which should has padding bytes when allocating
Expand Down Expand Up @@ -276,18 +86,6 @@ class SliceNormalEqual {
}
};

template <class T>
class StdHash {
public:
std::size_t operator()(T value) const { return phmap_mix<sizeof(size_t)>()(std::hash<T>()(value)); }
};

template <class T, PhmapSeed seed>
class StdHashWithSeed {
public:
std::size_t operator()(T value) const { return phmap_mix_with_seed<sizeof(size_t), seed>()(std::hash<T>()(value)); }
};

inline uint64_t crc_hash_uint64(uint64_t value, uint64_t seed) {
#if defined(__x86_64__) && defined(__SSE4_2__)
return _mm_crc32_u64(seed, value);
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/join_hash_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void HashTableProbeState::consider_probe_time_locality() {
if ((probe_chunks & (detect_step - 1)) == 0) {
int window_size = std::min(active_coroutines * 4, 50);
if (probe_row_count > window_size) {
phmap::flat_hash_map<uint32_t, uint32_t> occurrence;
phmap::flat_hash_map<uint32_t, uint32_t, StdHash<uint32_t>> occurrence;
occurrence.reserve(probe_row_count);
uint32_t unique_size = 0;
bool enable_interleaving = true;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/exchange/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class ExchangeSinkOperator final : public Operator {
int32_t _encode_level = 0;
// Will set in prepare
int32_t _be_number = 0;
phmap::flat_hash_map<int64_t, std::unique_ptr<Channel>> _instance_id2channel;
phmap::flat_hash_map<int64_t, std::unique_ptr<Channel>, StdHash<int64_t>> _instance_id2channel;
std::vector<Channel*> _channels;
// index list for channels
// We need a random order of sending channels to avoid rpc blocking at the same time.
Expand Down
23 changes: 12 additions & 11 deletions be/src/exec/pipeline/exchange/sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,29 +135,30 @@ class SinkBuffer {
/// because TUniqueId::hi is exactly the same in one query

// num eos per instance
phmap::flat_hash_map<int64_t, int64_t> _num_sinkers;
phmap::flat_hash_map<int64_t, int64_t> _request_seqs;
phmap::flat_hash_map<int64_t, int64_t, StdHash<int64_t>> _num_sinkers;
phmap::flat_hash_map<int64_t, int64_t, StdHash<int64_t>> _request_seqs;
// Considering the following situation
// Sending request 1, 2, 3 in order with one possible order of response 1, 3, 2,
// and field transformation are as following
// a. receive response-1, _max_continuous_acked_seqs[x]->1, _discontinuous_acked_seqs[x]->()
// b. receive response-3, _max_continuous_acked_seqs[x]->1, _discontinuous_acked_seqs[x]->(3)
// c. receive response-2, _max_continuous_acked_seqs[x]->3, _discontinuous_acked_seqs[x]->()
phmap::flat_hash_map<int64_t, int64_t> _max_continuous_acked_seqs;
phmap::flat_hash_map<int64_t, std::unordered_set<int64_t>> _discontinuous_acked_seqs;
phmap::flat_hash_map<int64_t, int64_t, StdHash<int64_t>> _max_continuous_acked_seqs;
phmap::flat_hash_map<int64_t, std::unordered_set<int64_t>, StdHash<int64_t>> _discontinuous_acked_seqs;
std::atomic<int32_t> _total_in_flight_rpc = 0;
std::atomic<int32_t> _num_uncancelled_sinkers = 0;
std::atomic<int32_t> _num_remaining_eos = 0;

// The request needs the reference to the allocated finst id,
// so cache finst id for each dest fragment instance.
phmap::flat_hash_map<int64_t, PUniqueId> _instance_id2finst_id;
phmap::flat_hash_map<int64_t, std::queue<TransmitChunkInfo, std::list<TransmitChunkInfo>>> _buffers;
phmap::flat_hash_map<int64_t, int32_t> _num_finished_rpcs;
phmap::flat_hash_map<int64_t, int32_t> _num_in_flight_rpcs;
phmap::flat_hash_map<int64_t, TimeTrace> _network_times;
phmap::flat_hash_map<int64_t, std::unique_ptr<Mutex>> _mutexes;
phmap::flat_hash_map<int64_t, TNetworkAddress> _dest_addrs;
phmap::flat_hash_map<int64_t, PUniqueId, StdHash<int64_t>> _instance_id2finst_id;
phmap::flat_hash_map<int64_t, std::queue<TransmitChunkInfo, std::list<TransmitChunkInfo>>, StdHash<int64_t>>
_buffers;
phmap::flat_hash_map<int64_t, int32_t, StdHash<int64_t>> _num_finished_rpcs;
phmap::flat_hash_map<int64_t, int32_t, StdHash<int64_t>> _num_in_flight_rpcs;
phmap::flat_hash_map<int64_t, TimeTrace, StdHash<int64_t>> _network_times;
phmap::flat_hash_map<int64_t, std::unique_ptr<Mutex>, StdHash<int64_t>> _mutexes;
phmap::flat_hash_map<int64_t, TNetworkAddress, StdHash<int64_t>> _dest_addrs;

// True means that SinkBuffer needn't input chunk and send chunk anymore,
// but there may be still in-flight RPC running.
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/pipeline_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ class PipelineDriver {
DriverState _state{DriverState::NOT_READY};
std::shared_ptr<RuntimeProfile> _runtime_profile = nullptr;

phmap::flat_hash_map<int32_t, OperatorStage> _operator_stages;
phmap::flat_hash_map<int32_t, OperatorStage, StdHash<int32_t>> _operator_stages;

workgroup::WorkGroupPtr _workgroup = nullptr;
DriverQueue* _in_queue = nullptr;
Expand Down
7 changes: 7 additions & 0 deletions be/src/exec/pipeline/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ QueryContext::~QueryContext() noexcept {
// remaining other RuntimeStates after the current RuntimeState is freed, MemChunkAllocator uses the MemTracker of the
// current RuntimeState to release Operators, OperatorFactories in the remaining RuntimeStates will trigger
// segmentation fault.
if (_mem_tracker != nullptr) {
LOG(INFO) << fmt::format(
"finished query_id:{} context life time:{} cpu costs:{} peak memusage:{} scan_bytes:{} spilled "
"bytes:{}",
print_id(query_id()), lifetime(), cpu_cost(), mem_cost_bytes(), get_scan_bytes(), get_spill_bytes());
}

{
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(_mem_tracker.get());
_fragment_mgr.reset();
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "runtime/query_statistics.h"
#include "runtime/runtime_state.h"
#include "util/debug/query_trace.h"
#include "util/hash.h"
#include "util/hash_util.hpp"
#include "util/spinlock.h"
#include "util/time.h"
Expand Down Expand Up @@ -349,7 +350,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
// we use spinlock + flat_hash_map here, after upgrading, we can change it to parallel_flat_hash_map
SpinLock _scan_stats_lock;
// table level scan stats
phmap::flat_hash_map<int64_t, std::shared_ptr<ScanStats>> _scan_stats;
phmap::flat_hash_map<int64_t, std::shared_ptr<ScanStats>, StdHash<int64_t>> _scan_stats;

std::unordered_map<int32_t, std::shared_ptr<NodeExecStats>> _node_exec_stats;

Expand Down
2 changes: 1 addition & 1 deletion be/src/exprs/ngram.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ class NgramFunctionImpl {
}

static NgramHash getAsciiHash(const Gram* ch, size_t gram_num) {
return crc_hash_32(ch, gram_num, CRC_HASH_SEED1) & (0xffffu);
return crc_hash_32(ch, gram_num, CRC_HASH_SEEDS::CRC_HASH_SEED1) & (0xffffu);
}
};

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/data_stream_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class DataStreamMgr {
// map from hash value of fragment instance id/node id pair to stream receivers;
// Ownership of the stream revcr is shared between this instance and the caller of
// create_recvr().
typedef phmap::flat_hash_map<PlanNodeId, std::shared_ptr<DataStreamRecvr>> RecvrMap;
typedef phmap::flat_hash_map<PlanNodeId, std::shared_ptr<DataStreamRecvr>, StdHash<PlanNodeId>> RecvrMap;
typedef phmap::flat_hash_map<TUniqueId, std::shared_ptr<RecvrMap>> StreamMap;
StreamMap _receiver_map[BUCKET_NUM];
std::atomic<uint32_t> _fragment_count{0};
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/local_pass_through_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "column/vectorized_fwd.h"
#include "gen_cpp/Types_types.h" // for TUniqueId
#include "runtime/descriptors.h" // for PlanNodeId
#include "util/hash.h"

namespace starrocks {

Expand All @@ -32,7 +33,7 @@ class PassThroughChunkBuffer {

struct KeyHash {
size_t operator()(const Key& key) const {
uint64_t hash = CRC_HASH_SEED1;
uint64_t hash = CRC_HASH_SEEDS::CRC_HASH_SEED1;
hash = crc_hash_uint64(std::get<0>(key).hi, hash);
hash = crc_hash_uint64(std::get<0>(key).lo, hash);
hash = crc_hash_uint64(std::get<1>(key), hash);
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/sender_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,11 @@ class DataStreamRecvr::PipelineSenderQueue final : public DataStreamRecvr::Sende
// distribution of received sequence numbers:
// part1: { sequence | 1 <= sequence <= _max_processed_sequence }
// part2: { sequence | seq = _max_processed_sequence + i, i > 1 }
phmap::flat_hash_map<int, int64_t> _max_processed_sequences;
phmap::flat_hash_map<int, int64_t, StdHash<int>> _max_processed_sequences;
// chunk request may be out-of-order, but we have to deal with it in order
// key of first level is be_number
// key of second level is request sequence
phmap::flat_hash_map<int, phmap::flat_hash_map<int64_t, ChunkList>> _buffered_chunk_queues;
phmap::flat_hash_map<int, phmap::flat_hash_map<int64_t, ChunkList>, StdHash<int>> _buffered_chunk_queues;

std::atomic<bool> _is_chunk_meta_built{false};

Expand Down
Loading

0 comments on commit 22a1f91

Please sign in to comment.