From c04008c3bf6bf0d452bbaefd4a8ace0baa265fe5 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 13 Jul 2023 12:17:13 +0800 Subject: [PATCH] .*: spilt profile info from `Interpreters::Join` and implement `PipelineExecutor::getRuntimeStatistics` (#7778) ref pingcap/tiflash#6518 --- dbms/src/Flash/Coprocessor/DAGContext.h | 6 +++--- .../Flash/Coprocessor/DAGQueryBlockInterpreter.cpp | 4 ++-- dbms/src/Flash/Executor/PipelineExecutor.cpp | 10 +++++++++- dbms/src/Flash/Pipeline/Pipeline.cpp | 12 ++++++++++++ dbms/src/Flash/Pipeline/Pipeline.h | 2 ++ dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp | 4 ++-- dbms/src/Flash/Statistics/JoinImpl.cpp | 6 +++--- dbms/src/Interpreters/Join.cpp | 8 ++++++++ dbms/src/Interpreters/Join.h | 13 +++++++++++++ 9 files changed, 54 insertions(+), 11 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 2d6f7569e3f..259847ff8f3 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -52,12 +52,12 @@ using MPPReceiverSetPtr = std::shared_ptr; class CoprocessorReader; using CoprocessorReaderPtr = std::shared_ptr; -class Join; -using JoinPtr = std::shared_ptr; +struct JoinProfileInfo; +using JoinProfileInfoPtr = std::shared_ptr; struct JoinExecuteInfo { String build_side_root_executor_id; - JoinPtr join_ptr; + JoinProfileInfoPtr join_profile_info; BlockInputStreams join_build_streams; OperatorProfileInfos join_build_profile_infos; }; diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 44c209c1dd3..7fb03ae4aa2 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -357,8 +357,8 @@ void DAGQueryBlockInterpreter::recordJoinExecuteInfo(size_t build_side_index, co const auto * build_side_root_executor = query_block.children[build_side_index]->root; JoinExecuteInfo join_execute_info; join_execute_info.build_side_root_executor_id = build_side_root_executor->executor_id(); - join_execute_info.join_ptr = join_ptr; - assert(join_execute_info.join_ptr); + join_execute_info.join_profile_info = join_ptr->profile_info; + assert(join_execute_info.join_profile_info); dagContext().getJoinExecuteInfoMap()[query_block.source_name] = std::move(join_execute_info); } diff --git a/dbms/src/Flash/Executor/PipelineExecutor.cpp b/dbms/src/Flash/Executor/PipelineExecutor.cpp index e2e4817f202..c3e1e3e4e16 100644 --- a/dbms/src/Flash/Executor/PipelineExecutor.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutor.cpp @@ -139,13 +139,21 @@ RU PipelineExecutor::collectRequestUnit() Block PipelineExecutor::getSampleBlock() const { + assert(root_pipeline); return root_pipeline->getSampleBlock(); } BaseRuntimeStatistics PipelineExecutor::getRuntimeStatistics() const { - // TODO support getRuntimeStatistics + assert(root_pipeline); + auto final_plan_exec_id = root_pipeline->getFinalPlanExecId(); BaseRuntimeStatistics runtime_statistics; + if (!final_plan_exec_id.empty()) + { + const auto & final_profile_infos = context.getDAGContext()->getOperatorProfileInfosMap()[final_plan_exec_id]; + for (const auto & profile_info : final_profile_infos) + runtime_statistics.append(*profile_info); + } return runtime_statistics; } } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Pipeline.cpp b/dbms/src/Flash/Pipeline/Pipeline.cpp index 01cd58ade49..944fcfaf0d7 100644 --- a/dbms/src/Flash/Pipeline/Pipeline.cpp +++ b/dbms/src/Flash/Pipeline/Pipeline.cpp @@ -167,6 +167,18 @@ void Pipeline::addGetResultSink(const ResultQueuePtr & result_queue) addPlanNode(get_result_sink); } +String Pipeline::getFinalPlanExecId() const +{ + // NOLINTNEXTLINE(modernize-loop-convert) + for (auto it = plan_nodes.crbegin(); it != plan_nodes.crend(); ++it) + { + const auto & plan_node = *it; + if (plan_node->isTiDBOperator()) + return plan_node->execId(); + } + return ""; +} + PipelineExecGroup Pipeline::buildExecGroup(PipelineExecutorContext & exec_context, Context & context, size_t concurrency) { RUNTIME_CHECK(!plan_nodes.empty()); diff --git a/dbms/src/Flash/Pipeline/Pipeline.h b/dbms/src/Flash/Pipeline/Pipeline.h index 64f100cd586..52b83fccd10 100644 --- a/dbms/src/Flash/Pipeline/Pipeline.h +++ b/dbms/src/Flash/Pipeline/Pipeline.h @@ -90,6 +90,8 @@ class Pipeline : public std::enable_shared_from_this /// taskn──┘ └──►taskm EventPtr complete(PipelineExecutorContext & exec_context); + String getFinalPlanExecId() const; + private: void toSelfString(FmtBuffer & buffer, size_t level) const; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp index d3090574e89..f0838610f5f 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp @@ -50,8 +50,8 @@ void recordJoinExecuteInfo( { JoinExecuteInfo join_execute_info; join_execute_info.build_side_root_executor_id = build_side_executor_id; - join_execute_info.join_ptr = join_ptr; - RUNTIME_CHECK(join_execute_info.join_ptr); + join_execute_info.join_profile_info = join_ptr->profile_info; + RUNTIME_CHECK(join_execute_info.join_profile_info); dag_context.getJoinExecuteInfoMap()[executor_id] = std::move(join_execute_info); } } // namespace diff --git a/dbms/src/Flash/Statistics/JoinImpl.cpp b/dbms/src/Flash/Statistics/JoinImpl.cpp index 76236f31f3a..7b2e3751b2f 100644 --- a/dbms/src/Flash/Statistics/JoinImpl.cpp +++ b/dbms/src/Flash/Statistics/JoinImpl.cpp @@ -40,10 +40,10 @@ void JoinStatistics::collectExtraRuntimeDetail() if (it != join_execute_info_map.end()) { const auto & join_execute_info = it->second; - peak_build_bytes_usage = join_execute_info.join_ptr->getPeakBuildBytesUsage(); + peak_build_bytes_usage = join_execute_info.join_profile_info->peak_build_bytes_usage; build_side_child = join_execute_info.build_side_root_executor_id; - is_spill_enabled = join_execute_info.join_ptr->isEnableSpill(); - is_spilled = join_execute_info.join_ptr->isSpilled(); + is_spill_enabled = join_execute_info.join_profile_info->is_spill_enabled; + is_spilled = join_execute_info.join_profile_info->is_spilled; switch (dag_context.getExecutionMode()) { case ExecutionMode::None: diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 02d5cb1dbe3..bc55308e624 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -1540,6 +1540,13 @@ void Join::workAfterBuildFinish() finalizeRuntimeFilter(); } +void Join::finalizeProfileInfo() +{ + profile_info->is_spill_enabled = isEnableSpill(); + profile_info->is_spilled = isSpilled(); + profile_info->peak_build_bytes_usage = getPeakBuildBytesUsage(); +} + void Join::workAfterProbeFinish() { if (isEnableSpill()) @@ -1554,6 +1561,7 @@ void Join::workAfterProbeFinish() } } } + finalizeProfileInfo(); } void Join::waitUntilAllBuildFinished() const diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 9d4ed8ec184..ef10f22a4c8 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -31,10 +31,19 @@ #include #include +#include #include namespace DB { +struct JoinProfileInfo +{ + UInt64 peak_build_bytes_usage = 0; + bool is_spill_enabled = false; + bool is_spilled = false; +}; +using JoinProfileInfoPtr = std::shared_ptr; + class Join; using JoinPtr = std::shared_ptr; @@ -277,6 +286,8 @@ class Join // used to name the column that records matched map entry before other conditions filter const String flag_mapped_entry_helper_name; + const JoinProfileInfoPtr profile_info = std::make_shared(); + private: friend class ScanHashMapAfterProbeBlockInputStream; @@ -450,6 +461,8 @@ class Join void workAfterProbeFinish(); void generateRuntimeFilterValues(const Block & block); + + void finalizeProfileInfo(); }; } // namespace DB