Skip to content

Commit

Permalink
[enhancement](metrics) enhance visibility of flush thread pool (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
TangSiyang2001 committed Nov 12, 2023
1 parent 92581d3 commit e5bf103
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 19 deletions.
39 changes: 30 additions & 9 deletions be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,37 @@
#include "olap/memtable_flush_executor.h"

#include <gen_cpp/olap_file.pb.h>
#include <stddef.h>

#include <algorithm>
#include <cstddef>
#include <ostream>

#include "common/config.h"
#include "common/logging.h"
#include "olap/memtable.h"
#include "util/stopwatch.hpp"
#include "util/time.h"
#include "olap/rowset/rowset_writer.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"

namespace doris {
using namespace ErrorCode;

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_thread_num, MetricUnit::NOUNIT);

bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");

class MemtableFlushTask final : public Runnable {
public:
MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable> memtable,
int64_t submit_task_time)
: _flush_token(flush_token),
_memtable(std::move(memtable)),
_submit_task_time(submit_task_time) {}
_submit_task_time(submit_task_time) {
g_flush_task_num << 1;
}

~MemtableFlushTask() override = default;
~MemtableFlushTask() override { g_flush_task_num << -1; }

void run() override {
_flush_token->_flush_memtable(_memtable.get(), _submit_task_time);
Expand Down Expand Up @@ -144,10 +152,11 @@ void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) {

min_threads = std::max(1, config::high_priority_flush_thread_num_per_store);
max_threads = data_dir_num * min_threads;
ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
.set_min_threads(min_threads)
.set_max_threads(max_threads)
.build(&_high_prio_flush_pool);
static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
.set_min_threads(min_threads)
.set_max_threads(max_threads)
.build(&_high_prio_flush_pool));
_register_metrics();
}

// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order.
Expand Down Expand Up @@ -178,4 +187,16 @@ Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>* fl
return Status::OK();
}

void MemTableFlushExecutor::_register_metrics() {
REGISTER_HOOK_METRIC(flush_thread_pool_queue_size,
[this]() { return _flush_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(flush_thread_pool_thread_num,
[this]() { return _flush_pool->num_threads(); })
}

void MemTableFlushExecutor::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(flush_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(flush_thread_pool_thread_num);
}

} // namespace doris
9 changes: 6 additions & 3 deletions be/src/olap/memtable_flush_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

#pragma once

#include <stdint.h>

#include <atomic>
#include <cstdint>
#include <iosfwd>
#include <memory>
#include <utility>
Expand Down Expand Up @@ -97,8 +96,9 @@ class FlushToken {
// ...
class MemTableFlushExecutor {
public:
MemTableFlushExecutor() {}
MemTableFlushExecutor() = default;
~MemTableFlushExecutor() {
_deregister_metrics();
_flush_pool->shutdown();
_high_prio_flush_pool->shutdown();
}
Expand All @@ -111,6 +111,9 @@ class MemTableFlushExecutor {
bool should_serial, bool is_high_priority);

private:
void _register_metrics();
static void _deregister_metrics();

std::unique_ptr<ThreadPool> _flush_pool;
std::unique_ptr<ThreadPool> _high_prio_flush_pool;
};
Expand Down
10 changes: 10 additions & 0 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,16 @@ class DorisMetrics {
UIntGauge* heavy_work_max_threads;
UIntGauge* light_work_max_threads;

UIntGauge* flush_thread_pool_queue_size;
UIntGauge* flush_thread_pool_thread_num;

UIntGauge* local_scan_thread_pool_queue_size;
UIntGauge* local_scan_thread_pool_thread_num;
UIntGauge* remote_scan_thread_pool_queue_size;
UIntGauge* remote_scan_thread_pool_thread_num;
UIntGauge* limited_scan_thread_pool_queue_size;
UIntGauge* limited_scan_thread_pool_thread_num;

static DorisMetrics* instance() {
static DorisMetrics instance;
return &instance;
Expand Down
48 changes: 42 additions & 6 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

#include "scanner_scheduler.h"

#include <stdint.h>

#include <algorithm>
#include <cstdint>
#include <functional>
#include <list>
#include <memory>
#include <ostream>
#include <string>
#include <typeinfo>
Expand All @@ -40,6 +40,7 @@
#include "util/blocking_queue.hpp"
#include "util/cpu_info.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "util/thread.h"
#include "util/threadpool.h"
Expand All @@ -53,6 +54,13 @@

namespace doris::vectorized {

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_thread_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_thread_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_thread_num, MetricUnit::NOUNIT);

ScannerScheduler::ScannerScheduler() = default;

ScannerScheduler::~ScannerScheduler() {
Expand All @@ -66,6 +74,8 @@ ScannerScheduler::~ScannerScheduler() {

_is_closed = true;

_deregister_metrics();

_scheduler_pool->shutdown();
_local_scan_thread_pool->shutdown();
_remote_scan_thread_pool->shutdown();
Expand Down Expand Up @@ -94,9 +104,9 @@ Status ScannerScheduler::init(ExecEnv* env) {
}

// 2. local scan thread pool
_local_scan_thread_pool.reset(
new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_queue_size, "local_scan"));
_local_scan_thread_pool = std::make_unique<PriorityThreadPool>(
config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_queue_size, "local_scan");

// 3. remote scan thread pool
ThreadPoolBuilder("RemoteScanThreadPool")
Expand All @@ -114,6 +124,8 @@ Status ScannerScheduler::init(ExecEnv* env) {
.set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
.build(&_limited_scan_thread_pool);

_register_metrics();

_is_init = true;
return Status::OK();
}
Expand Down Expand Up @@ -151,7 +163,7 @@ void ScannerScheduler::_schedule_thread(int queue_id) {
}

[[maybe_unused]] static void* run_scanner_bthread(void* arg) {
auto f = reinterpret_cast<std::function<void()>*>(arg);
auto* f = reinterpret_cast<std::function<void()>*>(arg);
(*f)();
delete f;
return nullptr;
Expand Down Expand Up @@ -401,4 +413,28 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
ctx->push_back_scanner_and_reschedule(scanner);
}

void ScannerScheduler::_register_metrics() {
REGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size,
[this]() { return _local_scan_thread_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num,
[this]() { return _local_scan_thread_pool->get_active_threads(); });
REGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size,
[this]() { return _remote_scan_thread_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num,
[this]() { return _remote_scan_thread_pool->num_threads(); });
REGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size,
[this]() { return _limited_scan_thread_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num,
[this]() { return _limited_scan_thread_pool->num_threads(); });
}

void ScannerScheduler::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num);
DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num);
DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num);
}

} // namespace doris::vectorized
5 changes: 4 additions & 1 deletion be/src/vec/exec/scan/scanner_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ class ScannerScheduler {
// execution thread function
void _scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, VScannerSPtr scanner);

private:
void _register_metrics();

static void _deregister_metrics();

// Scheduling queue number.
// TODO: make it configurable.
static const int QUEUE_NUM = 4;
Expand Down

0 comments on commit e5bf103

Please sign in to comment.