Skip to content

Commit

Permalink
enhance visibility of flush thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
TangSiyang2001 committed Nov 7, 2023
1 parent 1f9fdac commit d967d94
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 17 deletions.
53 changes: 38 additions & 15 deletions be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
#include "olap/memtable_flush_executor.h"

#include <gen_cpp/olap_file.pb.h>
#include <stddef.h>

#include <algorithm>
#include <cstddef>
#include <ostream>

#include "common/config.h"
Expand All @@ -29,22 +29,31 @@
#include "olap/memtable.h"
#include "olap/rowset/rowset_writer.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
#include "util/stopwatch.hpp"
#include "util/time.h"

namespace doris {
using namespace ErrorCode;

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_thread_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_task_num, MetricUnit::NOUNIT);

bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");

class MemtableFlushTask final : public Runnable {
public:
MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable> memtable,
int32_t segment_id, int64_t submit_task_time)
: _flush_token(flush_token),
_memtable(std::move(memtable)),
_segment_id(segment_id),
_submit_task_time(submit_task_time) {}
_submit_task_time(submit_task_time) {
g_flush_task_num << 1;
}

~MemtableFlushTask() override = default;
~MemtableFlushTask() override { g_flush_task_num << -1; }

void run() override {
_flush_token->_flush_memtable(_memtable.get(), _segment_id, _submit_task_time);
Expand Down Expand Up @@ -122,7 +131,8 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in
return Status::OK();
}

void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t submit_task_time) {
void FlushToken::_flush_memtable(MemTable* mem_table, int32_t segment_id,
int64_t submit_task_time) {
uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
_stats.flush_wait_time_ns += flush_wait_time_ns;
// If previous flush has failed, return directly
Expand All @@ -135,10 +145,10 @@ void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t

MonotonicStopWatch timer;
timer.start();
size_t memory_usage = memtable->memory_usage();
size_t memory_usage = mem_table->memory_usage();

int64_t flush_size;
Status s = _do_flush_memtable(memtable, segment_id, &flush_size);
Status s = _do_flush_memtable(mem_table, segment_id, &flush_size);

{
std::shared_lock rdlk(_flush_status_lock);
Expand All @@ -161,7 +171,7 @@ void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t
_stats.flush_time_ns += timer.elapsed_time();
_stats.flush_finish_count++;
_stats.flush_running_count--;
_stats.flush_size_bytes += memtable->memory_usage();
_stats.flush_size_bytes += mem_table->memory_usage();
_stats.flush_disk_size_bytes += flush_size;
}

Expand All @@ -180,6 +190,7 @@ void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) {
.set_min_threads(min_threads)
.set_max_threads(max_threads)
.build(&_high_prio_flush_pool));
_register_metrics();
}

// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order.
Expand All @@ -189,26 +200,38 @@ Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& fl
if (!is_high_priority) {
if (rowset_writer->type() == BETA_ROWSET && !should_serial) {
// beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer.
flush_token.reset(
new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)));
flush_token = std::make_unique<FlushToken>(
_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT));
} else {
// alpha rowset do not support flush in CONCURRENT.
flush_token.reset(
new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
flush_token = std::make_unique<FlushToken>(
_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL));
}
} else {
if (rowset_writer->type() == BETA_ROWSET && !should_serial) {
// beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer.
flush_token.reset(new FlushToken(
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)));
flush_token = std::make_unique<FlushToken>(
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT));
} else {
// alpha rowset do not support flush in CONCURRENT.
flush_token.reset(new FlushToken(
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
flush_token = std::make_unique<FlushToken>(
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL));
}
}
flush_token->set_rowset_writer(rowset_writer);
return Status::OK();
}

void MemTableFlushExecutor::_register_metrics() {
REGISTER_HOOK_METRIC(flush_thread_pool_queue_size,
[this]() { return _flush_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(flush_thread_pool_thread_num,
[this]() { return _flush_pool->num_threads(); })
}

void MemTableFlushExecutor::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(flush_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(flush_thread_pool_thread_num);
}

} // namespace doris
8 changes: 6 additions & 2 deletions be/src/olap/memtable_flush_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#pragma once

#include <stdint.h>
#include <cstdint>

#include <atomic>
#include <iosfwd>
Expand Down Expand Up @@ -108,8 +108,9 @@ class FlushToken {
// ...
class MemTableFlushExecutor {
public:
MemTableFlushExecutor() {}
MemTableFlushExecutor() = default;
~MemTableFlushExecutor() {
_deregister_metrics();
_flush_pool->shutdown();
_high_prio_flush_pool->shutdown();
}
Expand All @@ -122,6 +123,9 @@ class MemTableFlushExecutor {
bool should_serial, bool is_high_priority);

private:
void _register_metrics();
static void _deregister_metrics();

std::unique_ptr<ThreadPool> _flush_pool;
std::unique_ptr<ThreadPool> _high_prio_flush_pool;
};
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/memtable_memory_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace doris {
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(memtable_memory_limiter_mem_consumption, MetricUnit::BYTES, "",
memtable_memory_limiter_mem_consumption,
Labels({{"type", "load"}}));
bvar::Adder<int64_t> g_flush_async_num("memtable_flush_async_num");

// Calculate the total memory limit of all load tasks on this BE
static int64_t calc_process_max_load_memory(int64_t process_mem_limit) {
Expand Down Expand Up @@ -135,6 +136,7 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
LOG(WARNING) << err_msg;
static_cast<void>(writer->cancel_with_status(st));
}
g_flush_async_num << 1;
mem_consumption_in_picked_writer += mem_size;
if (mem_consumption_in_picked_writer > mem_to_flushed) {
break;
Expand Down
3 changes: 3 additions & 0 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ class DorisMetrics {
UIntGauge* heavy_work_max_threads;
UIntGauge* light_work_max_threads;

UIntGauge* flush_thread_pool_queue_size;
UIntGauge* flush_thread_pool_thread_num;

static DorisMetrics* instance() {
static DorisMetrics instance;
return &instance;
Expand Down

0 comments on commit d967d94

Please sign in to comment.