From 297de6052427fbda9997d266812520856c02b87e Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Tue, 19 Dec 2023 00:41:07 +0800 Subject: [PATCH] [fix](load) fix memtracking orphan too large --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 2 ++ be/src/olap/memtable.cpp | 11 ++++++++++- be/src/olap/memtable_memory_limiter.cpp | 2 ++ be/src/runtime/tablets_channel.cpp | 9 +++++++++ 5 files changed, 25 insertions(+), 1 deletion(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f286c33d547921..82097c920e4ba8 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -586,6 +586,8 @@ DEFINE_mInt32(memtable_soft_limit_active_percent, "50"); // Alignment DEFINE_Int32(memory_max_alignment, "16"); +// memtable insert memory tracker will multiply input block size with this ratio +DEFINE_mDouble(memtable_insert_memory_ratio, "1.4"); // max write buffer size before flush, default 200MB DEFINE_mInt64(write_buffer_size, "209715200"); // max buffer size used in memtable for the aggregated table, default 400MB diff --git a/be/src/common/config.h b/be/src/common/config.h index 4054b315aa40b3..49ea164db32a65 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -645,6 +645,8 @@ DECLARE_mInt32(memtable_soft_limit_active_percent); // Alignment DECLARE_Int32(memory_max_alignment); +// memtable insert memory tracker will multiply input block size with this ratio +DECLARE_mDouble(memtable_insert_memory_ratio); // max write buffer size before flush, default 200MB DECLARE_mInt64(write_buffer_size); // max buffer size used in memtable for the aggregated table, default 400MB diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 95966edf9dbdc5..e868d79dfb6a6b 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -44,6 +44,7 @@ namespace doris { bvar::Adder g_memtable_cnt("memtable_cnt"); +bvar::Adder g_memtable_input_block_allocated_size("memtable_input_block_allocated_size"); using namespace ErrorCode; @@ -137,6 +138,7 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) { } MemTable::~MemTable() { + g_memtable_input_block_allocated_size << -_input_mutable_block.allocated_bytes(); g_memtable_cnt << -1; if (_keys_type != KeysType::DUP_KEYS) { for (auto it = _row_in_blocks.begin(); it != _row_in_blocks.end(); it++) { @@ -198,6 +200,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vectorconsume(input_size); for (int i = 0; i < num_rows; i++) { @@ -504,6 +510,9 @@ std::unique_ptr MemTable::to_block() { !_tablet_schema->cluster_key_idxes().empty()) { _sort_by_cluster_keys(); } + _input_mutable_block.clear(); + _insert_mem_tracker->release(_mem_usage); + _mem_usage = 0; return vectorized::Block::create_unique(_output_mutable_block.to_block()); } diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index bceb33419a1772..6d5ee2f3f7621f 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -38,6 +38,7 @@ bvar::Status g_memtable_flush_memory("mm_limiter_mem_flush", 0); bvar::Status g_memtable_load_memory("mm_limiter_mem_load", 0); bvar::Status g_load_hard_mem_limit("mm_limiter_limit_hard", 0); bvar::Status g_load_soft_mem_limit("mm_limiter_limit_soft", 0); +bvar::Status g_orphan_memory("mm_limiter_mem_orphan", 0); // Calculate the total memory limit of all load tasks on this BE static int64_t calc_process_max_load_memory(int64_t process_mem_limit) { @@ -236,6 +237,7 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() { g_memtable_load_memory.set_value(_mem_usage); VLOG_DEBUG << "refreshed mem_tracker, num writers: " << _writers.size(); THREAD_MEM_TRACKER_TRANSFER_TO(_mem_usage - _mem_tracker->consumption(), _mem_tracker.get()); + g_orphan_memory.set_value(ExecEnv::GetInstance()->orphan_mem_tracker()->consumption()); if (!_hard_limit_reached()) { _hard_limit_end_cond.notify_all(); } diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index d0d742e9152dac..31a1b23d84b606 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -17,6 +17,7 @@ #include "runtime/tablets_channel.h" +#include #include #include #include @@ -41,6 +42,7 @@ #include "olap/storage_engine.h" #include "olap/txn_manager.h" #include "runtime/load_channel.h" +#include "util/defer_op.h" #include "util/doris_metrics.h" #include "util/metrics.h" #include "vec/core/block.h" @@ -48,6 +50,9 @@ namespace doris { class SlotDescriptor; +bvar::Adder g_tablets_channel_send_data_allocated_size( + "tablets_channel_send_data_allocated_size"); + DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT); std::atomic BaseTabletsChannel::_s_tablet_writer_count; @@ -537,6 +542,10 @@ Status BaseTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request << "block rows: " << send_data.rows() << ", tablet_ids_size: " << request.tablet_ids_size(); + g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes(); + Defer defer { + [&]() { g_tablets_channel_send_data_allocated_size << -send_data.allocated_bytes(); }}; + auto write_tablet_data = [&](uint32_t tablet_id, std::function write_func) { google::protobuf::RepeatedPtrField* tablet_errors =