diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 5f6f3758883..8881f3c75bf 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -113,7 +113,8 @@ namespace DB M(force_fap_worker_throw) \ M(delta_tree_create_node_fail) \ M(disable_flush_cache) \ - M(force_agg_two_level_hash_table_before_merge) + M(force_agg_two_level_hash_table_before_merge) \ + M(force_thread_0_no_agg_spill) #define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \ M(pause_with_alter_locks_acquired) \ diff --git a/dbms/src/Flash/Executor/PipelineExecutor.cpp b/dbms/src/Flash/Executor/PipelineExecutor.cpp index d1413e32009..602531fa49e 100644 --- a/dbms/src/Flash/Executor/PipelineExecutor.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutor.cpp @@ -161,4 +161,9 @@ BaseRuntimeStatistics PipelineExecutor::getRuntimeStatistics() const } return runtime_statistics; } + +String PipelineExecutor::getExtraJsonInfo() const +{ + return exec_context.getQueryProfileInfo().toJson(); +} } // namespace DB diff --git a/dbms/src/Flash/Executor/PipelineExecutor.h b/dbms/src/Flash/Executor/PipelineExecutor.h index e8eb69b18d4..b86b3b09b9a 100644 --- a/dbms/src/Flash/Executor/PipelineExecutor.h +++ b/dbms/src/Flash/Executor/PipelineExecutor.h @@ -73,6 +73,8 @@ class PipelineExecutor : public QueryExecutor BaseRuntimeStatistics getRuntimeStatistics() const override; + String getExtraJsonInfo() const override; + protected: ExecutionResult execute(ResultHandler && result_handler) override; diff --git a/dbms/src/Flash/Executor/QueryExecutor.h b/dbms/src/Flash/Executor/QueryExecutor.h index 558f3b2504a..6b5fcfda349 100644 --- a/dbms/src/Flash/Executor/QueryExecutor.h +++ b/dbms/src/Flash/Executor/QueryExecutor.h @@ -55,6 +55,8 @@ class QueryExecutor virtual BaseRuntimeStatistics getRuntimeStatistics() const = 0; + virtual String getExtraJsonInfo() const { return "{}"; } + protected: virtual ExecutionResult execute(ResultHandler &&) = 0; diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 80932223202..75c5c221ed4 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -579,6 +579,7 @@ void MPPTask::runImpl() GET_METRIC(tiflash_compute_request_unit, type_mpp).Increment(cpu_ru + read_ru); mpp_task_statistics.setRUInfo( RUConsumption{.cpu_ru = cpu_ru, .cpu_time_ns = cpu_time_ns, .read_ru = read_ru, .read_bytes = read_bytes}); + mpp_task_statistics.setExtraInfo(query_executor_holder->getExtraJsonInfo()); mpp_task_statistics.collectRuntimeStatistics(); diff --git a/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp b/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp index 09883c63b3f..0be18a3abff 100644 --- a/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp @@ -114,7 +114,7 @@ void MPPTaskStatistics::logTracingJson() R"(,"compile_start_timestamp":{},"compile_end_timestamp":{})" R"(,"read_wait_index_start_timestamp":{},"read_wait_index_end_timestamp":{})" R"(,"local_input_bytes":{},"remote_input_bytes":{},"output_bytes":{})" - R"(,"status":"{}","error_message":"{}","cpu_ru":{},"read_ru":{},"memory_peak":{}}})", + R"(,"status":"{}","error_message":"{}","cpu_ru":{},"read_ru":{},"memory_peak":{},"extra_info":{}}})", id.gather_id.query_id.start_ts, id.task_id, is_root, @@ -135,7 +135,8 @@ void MPPTaskStatistics::logTracingJson() error_message, ru_info.cpu_ru, ru_info.read_ru, - memory_peak); + memory_peak, + extra_info); } void MPPTaskStatistics::setMemoryPeak(Int64 memory_peak_) @@ -155,6 +156,11 @@ void MPPTaskStatistics::setCompileTimestamp(const Timestamp & start_timestamp, c compile_end_timestamp = end_timestamp; } +void MPPTaskStatistics::setExtraInfo(const String & extra_info_) +{ + extra_info = extra_info_; +} + void MPPTaskStatistics::recordInputBytes(DAGContext & dag_context) { switch (dag_context.getExecutionMode()) diff --git a/dbms/src/Flash/Mpp/MPPTaskStatistics.h b/dbms/src/Flash/Mpp/MPPTaskStatistics.h index 334ead31eb8..ef012210407 100644 --- a/dbms/src/Flash/Mpp/MPPTaskStatistics.h +++ b/dbms/src/Flash/Mpp/MPPTaskStatistics.h @@ -55,6 +55,8 @@ class MPPTaskStatistics void setCompileTimestamp(const Timestamp & start_timestamp, const Timestamp & end_timestamp); + void setExtraInfo(const String & extra_info_); + tipb::SelectResponse genExecutionSummaryResponse(); tipb::TiFlashExecutionInfo genTiFlashExecutionInfo(); @@ -92,5 +94,8 @@ class MPPTaskStatistics // resource RUConsumption ru_info{.cpu_ru = 0.0, .cpu_time_ns = 0, .read_ru = 0.0, .read_bytes = 0}; Int64 memory_peak = 0; + + // extra + String extra_info = "{}"; }; } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp b/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp index 53b1fda15fb..5ef36b27766 100644 --- a/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp +++ b/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp @@ -24,44 +24,52 @@ extern const char random_pipeline_model_execute_prefix_failpoint[]; extern const char random_pipeline_model_execute_suffix_failpoint[]; } // namespace FailPoints -#define HANDLE_OP_STATUS(op, op_status, expect_status) \ - switch (op_status) \ - { \ - /* For the expected status, it will not return here, */ \ - /* but instead return control to the macro caller, */ \ - /* who will continue to call the next operator. */ \ - case (expect_status): \ - break; \ - /* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \ - case OperatorStatus::IO_IN: \ - case OperatorStatus::IO_OUT: \ - fillIOOp((op).get()); \ - return (op_status); \ - /* For the waiting status, the operator needs to be filled in awaitable for later use in await. */ \ - case OperatorStatus::WAITING: \ - fillAwaitable((op).get()); \ - return (op_status); \ - /* For other status, an immediate return is required. */ \ - default: \ - return (op_status); \ +#define HANDLE_OP_STATUS(op, op_status, expect_status) \ + switch (op_status) \ + { \ + /* For the expected status, it will not return here, */ \ + /* but instead return control to the macro caller, */ \ + /* who will continue to call the next operator. */ \ + case (expect_status): \ + break; \ + /* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \ + case OperatorStatus::IO_IN: \ + case OperatorStatus::IO_OUT: \ + fillIOOp((op).get()); \ + return (op_status); \ + /* For the waiting status, the operator needs to be filled in awaitable for later use in await. */ \ + case OperatorStatus::WAITING: \ + fillAwaitable((op).get()); \ + return (op_status); \ + /* For the wait for notify status, the operator needs to be filled in awaitable for later use in await. */ \ + case OperatorStatus::WAIT_FOR_NOTIFY: \ + fillWaitingForNotifyOp((op).get()); \ + return (op_status); \ + /* For other status, an immediate return is required. */ \ + default: \ + return (op_status); \ } -#define HANDLE_LAST_OP_STATUS(op, op_status) \ - assert(op); \ - switch (op_status) \ - { \ - /* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \ - case OperatorStatus::IO_IN: \ - case OperatorStatus::IO_OUT: \ - fillIOOp((op).get()); \ - return (op_status); \ - /* For the waiting status, the operator needs to be filled in awaitable for later use in await. */ \ - case OperatorStatus::WAITING: \ - fillAwaitable((op).get()); \ - return (op_status); \ - /* For the last operator, the status will always be returned. */ \ - default: \ - return (op_status); \ +#define HANDLE_LAST_OP_STATUS(op, op_status) \ + assert(op); \ + switch (op_status) \ + { \ + /* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \ + case OperatorStatus::IO_IN: \ + case OperatorStatus::IO_OUT: \ + fillIOOp((op).get()); \ + return (op_status); \ + /* For the waiting status, the operator needs to be filled in awaitable for later use in await. */ \ + case OperatorStatus::WAITING: \ + fillAwaitable((op).get()); \ + return (op_status); \ + /* For the wait for notify status, the operator needs to be filled in awaitable for later use in await. */ \ + case OperatorStatus::WAIT_FOR_NOTIFY: \ + fillWaitingForNotifyOp((op).get()); \ + return (op_status); \ + /* For the last operator, the status will always be returned. */ \ + default: \ + return (op_status); \ } PipelineExec::PipelineExec(SourceOpPtr && source_op_, TransformOps && transform_ops_, SinkOpPtr && sink_op_) @@ -89,6 +97,13 @@ void PipelineExec::executeSuffix() source_op->operateSuffix(); } +void PipelineExec::notify() +{ + assert(waiting_for_notify); + waiting_for_notify->notify(); + waiting_for_notify = nullptr; +} + OperatorStatus PipelineExec::execute() { auto op_status = executeImpl(); @@ -107,6 +122,10 @@ OperatorStatus PipelineExec::execute() */ OperatorStatus PipelineExec::executeImpl() { + assert(!awaitable); + assert(!io_op); + assert(!waiting_for_notify); + Block block; size_t start_transform_op_index = 0; auto op_status = fetchBlock(block, start_transform_op_index); @@ -156,12 +175,25 @@ OperatorStatus PipelineExec::executeIO() } OperatorStatus PipelineExec::executeIOImpl() { + assert(!waiting_for_notify); + assert(!awaitable); assert(io_op); auto op_status = io_op->executeIO(); - if (op_status == OperatorStatus::WAITING) + switch (op_status) + { + case OperatorStatus::IO_IN: + case OperatorStatus::IO_OUT: + return op_status; + case OperatorStatus::WAITING: fillAwaitable(io_op); - if (op_status != OperatorStatus::IO_IN && op_status != OperatorStatus::IO_OUT) - io_op = nullptr; + break; + case OperatorStatus::WAIT_FOR_NOTIFY: + fillWaitingForNotifyOp(io_op); + break; + default: + break; + } + io_op = nullptr; return op_status; } @@ -177,40 +209,59 @@ OperatorStatus PipelineExec::await() } OperatorStatus PipelineExec::awaitImpl() { + assert(!waiting_for_notify); + assert(!io_op); assert(awaitable); auto op_status = awaitable->await(); - if (op_status == OperatorStatus::IO_IN || op_status == OperatorStatus::IO_OUT) + switch (op_status) + { + case OperatorStatus::WAITING: + return op_status; + case OperatorStatus::IO_IN: + case OperatorStatus::IO_OUT: fillIOOp(awaitable); - if (op_status != OperatorStatus::WAITING) - awaitable = nullptr; + break; + case OperatorStatus::WAIT_FOR_NOTIFY: + fillWaitingForNotifyOp(awaitable); + break; + default: + break; + } + awaitable = nullptr; return op_status; } #undef HANDLE_OP_STATUS #undef HANDLE_LAST_OP_STATUS -void PipelineExec::finalizeProfileInfo(UInt64 extra_time) +void PipelineExec::finalizeProfileInfo(UInt64 queuing_time, UInt64 pipeline_breaker_wait_time) { - // `extra_time` usually includes pipeline schedule duration and task queuing time. - // - // The pipeline schedule duration should be added to the pipeline breaker operator(AggConvergent and JoinProbe), + // For the pipeline_breaker_wait_time, it should be added to the pipeline breaker operator(AggConvergent and JoinProbe), // However, if there are multiple pipeline breaker operators within a single pipeline, it can become very complex. // Therefore, to simplify matters, we will include the pipeline schedule duration in the execution time of the source operator. // - // ditto for task queuing time. + // For the queuing_time, it should be evenly distributed across all operators. // // TODO Refining execution summary, excluding extra time from execution time. - // For example: [total_time:6s, execution_time:1s, pending_time:2s, pipeline_waiting_time:3s] + // For example: [total_time:6s, execution_time:1s, queuing_time:2s, pipeline_breaker_wait_time:3s] + + // The execution time of operator[i] = self_time_from_profile_info + sum(self_time_from_profile_info[i-1, .., 0]) + (i + 1) * extra_time / operator_num. + + source_op->getProfileInfo()->execution_time += pipeline_breaker_wait_time; + + UInt64 operator_num = 2 + transform_ops.size(); + UInt64 per_operator_queuing_time = queuing_time / operator_num; - // The execution time of operator[i] = self_time_from_profile_info + sum(self_time_from_profile_info[i-1, .., 0]) + extra_time. - source_op->getProfileInfo()->execution_time += extra_time; - extra_time = source_op->getProfileInfo()->execution_time; + source_op->getProfileInfo()->execution_time += per_operator_queuing_time; + // Compensate for the values missing due to rounding. + source_op->getProfileInfo()->execution_time += (queuing_time - (per_operator_queuing_time * operator_num)); + UInt64 time_for_prev_op = source_op->getProfileInfo()->execution_time; for (const auto & transform_op : transform_ops) { - transform_op->getProfileInfo()->execution_time += extra_time; - extra_time = transform_op->getProfileInfo()->execution_time; + transform_op->getProfileInfo()->execution_time += (per_operator_queuing_time + time_for_prev_op); + time_for_prev_op = transform_op->getProfileInfo()->execution_time; } - sink_op->getProfileInfo()->execution_time += extra_time; + sink_op->getProfileInfo()->execution_time += (per_operator_queuing_time + time_for_prev_op); } } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Exec/PipelineExec.h b/dbms/src/Flash/Pipeline/Exec/PipelineExec.h index 275c019cc97..103e0ad3e80 100644 --- a/dbms/src/Flash/Pipeline/Exec/PipelineExec.h +++ b/dbms/src/Flash/Pipeline/Exec/PipelineExec.h @@ -37,7 +37,9 @@ class PipelineExec : private boost::noncopyable OperatorStatus await(); - void finalizeProfileInfo(UInt64 extra_time); + void notify(); + + void finalizeProfileInfo(UInt64 queuing_time, UInt64 pipeline_breaker_wait_time); private: inline OperatorStatus executeImpl(); @@ -62,6 +64,13 @@ class PipelineExec : private boost::noncopyable io_op = op; } + ALWAYS_INLINE void fillWaitingForNotifyOp(Operator * op) + { + assert(!waiting_for_notify); + assert(op); + waiting_for_notify = op; + } + private: SourceOpPtr source_op; TransformOps transform_ops; @@ -72,6 +81,9 @@ class PipelineExec : private boost::noncopyable // hold the operator which is ready for executing io. Operator * io_op = nullptr; + + // hold the operator which is waiting for notify. + Operator * waiting_for_notify = nullptr; }; using PipelineExecPtr = std::unique_ptr; // a set of pipeline_execs running in parallel. diff --git a/dbms/src/Flash/Pipeline/Schedule/Events/AggregateFinalConvertEvent.cpp b/dbms/src/Flash/Pipeline/Schedule/Events/AggregateFinalConvertEvent.cpp index 9ad7b9af763..b7b9e034f4c 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Events/AggregateFinalConvertEvent.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Events/AggregateFinalConvertEvent.cpp @@ -13,7 +13,9 @@ // limitations under the License. #include +#include #include +#include namespace DB { @@ -31,6 +33,33 @@ void AggregateFinalConvertEvent::scheduleImpl() void AggregateFinalConvertEvent::finishImpl() { + if (need_final_spill) + { + /// Currently, the aggregation spill algorithm requires all bucket data to be spilled, + /// so a new event is added here to execute the final spill. + /// ...──►AggregateBuildSinkOp[local spill]──┐ + /// ...──►AggregateBuildSinkOp[local spill]──┤ ┌──►AggregateFinalSpillTask + /// ...──►AggregateBuildSinkOp[local spill]──┼──►[final spill]AggregateFinalSpillEvent─┼──►AggregateFinalSpillTask + /// ...──►AggregateBuildSinkOp[local spill]──┤ └──►AggregateFinalSpillTask + /// ...──►AggregateBuildSinkOp[local spill]──┘ + std::vector indexes; + for (size_t index = 0; index < agg_context->getBuildConcurrency(); ++index) + { + if (agg_context->needSpill(index, /*try_mark_need_spill=*/true)) + indexes.push_back(index); + } + if (!indexes.empty()) + { + auto final_spill_event = std::make_shared( + exec_context, + log->identifier(), + agg_context, + std::move(indexes), + std::move(profile_infos)); + insertEvent(final_spill_event); + } + } + auto dur = getFinishDuration(); for (const auto & profile_info : profile_infos) profile_info->execution_time += dur; diff --git a/dbms/src/Flash/Pipeline/Schedule/Events/AggregateFinalConvertEvent.h b/dbms/src/Flash/Pipeline/Schedule/Events/AggregateFinalConvertEvent.h index 8f5f204d867..9209ab5eecc 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Events/AggregateFinalConvertEvent.h +++ b/dbms/src/Flash/Pipeline/Schedule/Events/AggregateFinalConvertEvent.h @@ -30,10 +30,12 @@ class AggregateFinalConvertEvent : public Event const String & req_id, AggregateContextPtr agg_context_, std::vector && indexes_, + bool need_final_spill_, OperatorProfileInfos && profile_infos_) : Event(exec_context_, req_id) , agg_context(std::move(agg_context_)) , indexes(std::move(indexes_)) + , need_final_spill(need_final_spill_) , profile_infos(std::move(profile_infos_)) { assert(agg_context); @@ -49,6 +51,7 @@ class AggregateFinalConvertEvent : public Event private: AggregateContextPtr agg_context; std::vector indexes; + bool need_final_spill; OperatorProfileInfos profile_infos; }; diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h index 963433248d1..2f0171b8ba2 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h @@ -76,9 +76,16 @@ class PipeConditionVariable { assert(task); task->notify(); - task->profile_info.elapsedWaitForNotifyTime(); assert(TaskScheduler::instance); - TaskScheduler::instance->submitToCPUTaskThreadPool(std::move(task)); + if (unlikely(task->getStatus() == ExecTaskStatus::WAITING)) + { + TaskScheduler::instance->submitToWaitReactor(std::move(task)); + } + else + { + assert(task->getStatus() == ExecTaskStatus::RUNNING); + TaskScheduler::instance->submitToCPUTaskThreadPool(std::move(task)); + } } private: diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h index 318e56179d4..d7201697c2f 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h @@ -41,11 +41,11 @@ class PipelineTask ExecTaskStatus awaitImpl() override { return runAwait(); } + void notifyImpl() override { runNotify(); } + void doFinalizeImpl() override { - runFinalize( - profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs() - + profile_info.getWaitForNotifyTimeNs() + getScheduleDuration()); + runFinalize(profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs(), getScheduleDuration()); } }; } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTaskBase.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTaskBase.h index 34274e33880..991813dd52d 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTaskBase.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTaskBase.h @@ -124,15 +124,21 @@ class PipelineTaskBase } } - void runFinalize(UInt64 extra_time) + void runFinalize(UInt64 queuing_time, UInt64 pipeline_breaker_wait_time) { assert(pipeline_exec); pipeline_exec->executeSuffix(); - pipeline_exec->finalizeProfileInfo(extra_time); + pipeline_exec->finalizeProfileInfo(queuing_time, pipeline_breaker_wait_time); pipeline_exec = nullptr; pipeline_exec_holder.reset(); } + void runNotify() + { + assert(pipeline_exec); + pipeline_exec->notify(); + } + private: PipelineExecPtr pipeline_exec_holder; // To reduce the overheads of `pipeline_exec_holder.get()` diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/SimplePipelineTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/SimplePipelineTask.h index c713bfe95f5..03f30962ca0 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/SimplePipelineTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/SimplePipelineTask.h @@ -40,9 +40,11 @@ class SimplePipelineTask ExecTaskStatus awaitImpl() override { return runAwait(); } + void notifyImpl() override { runNotify(); } + void finalizeImpl() override { - runFinalize(profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs()); + runFinalize(profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs(), 0); } }; } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp index 2a060173425..c4e09c0e864 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp @@ -151,7 +151,14 @@ ExecTaskStatus Task::await() void Task::notify() { assert(task_status == ExecTaskStatus::WAIT_FOR_NOTIFY); - switchStatus(ExecTaskStatus::RUNNING); + // If the query has been canceled, + // move the task to WaitReactor to quickly trigger the cancel process. + if (unlikely(exec_context.isCancelled())) + switchStatus(ExecTaskStatus::WAITING); + else + switchStatus(ExecTaskStatus::RUNNING); + notifyImpl(); + profile_info.elapsedWaitForNotifyTime(); } void Task::finalize() diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h index 1681eb45e1b..e47ce10f38c 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h @@ -103,6 +103,8 @@ class Task // Avoid allocating memory in `await` if possible. virtual ExecTaskStatus awaitImpl() { return ExecTaskStatus::RUNNING; } + virtual void notifyImpl() {} + // Used to release held resources, just like `Event::finishImpl`. virtual void finalizeImpl() {} diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp b/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp index 32bd3d31033..f0b83ede5f2 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp @@ -88,13 +88,42 @@ EventPtr PhysicalAggregationBuild::doSinkComplete(PipelineExecutorContext & exec SCOPE_EXIT({ aggregate_context.reset(); }); aggregate_context->getAggSpillContext()->finishSpillableStage(); - bool need_final_spill = false; - for (size_t i = 0; i < aggregate_context->getBuildConcurrency(); ++i) + bool need_final_spill = aggregate_context->hasSpilledData(); + if (!need_final_spill) { - if (aggregate_context->getAggSpillContext()->isThreadMarkedForAutoSpill(i)) + for (size_t i = 0; i < aggregate_context->getBuildConcurrency(); ++i) { - need_final_spill = true; - break; + if (aggregate_context->getAggSpillContext()->isThreadMarkedForAutoSpill(i)) + { + need_final_spill = true; + break; + } + } + } + + if (aggregate_context->isConvertibleToTwoLevel()) + { + bool need_convert_to_two_level = need_final_spill || aggregate_context->hasAtLeastOneTwoLevel(); + fiu_do_on(FailPoints::force_agg_two_level_hash_table_before_merge, { need_convert_to_two_level = true; }); + if (need_convert_to_two_level) + { + std::vector indexes; + for (size_t index = 0; index < aggregate_context->getBuildConcurrency(); ++index) + { + if (!aggregate_context->isTwoLevelOrEmpty(index)) + indexes.push_back(index); + } + if (!indexes.empty()) + { + auto final_convert_event = std::make_shared( + exec_context, + log->identifier(), + aggregate_context, + std::move(indexes), + need_final_spill, + std::move(profile_infos)); + return final_convert_event; + } } } @@ -125,31 +154,6 @@ EventPtr PhysicalAggregationBuild::doSinkComplete(PipelineExecutorContext & exec } } - if (!aggregate_context->hasSpilledData() && aggregate_context->isConvertibleToTwoLevel()) - { - bool has_two_level = aggregate_context->hasAtLeastOneTwoLevel(); - fiu_do_on(FailPoints::force_agg_two_level_hash_table_before_merge, { has_two_level = true; }); - if (has_two_level) - { - std::vector indexes; - for (size_t index = 0; index < aggregate_context->getBuildConcurrency(); ++index) - { - if (!aggregate_context->isTwoLevelOrEmpty(index)) - indexes.push_back(index); - } - if (!indexes.empty()) - { - auto final_convert_event = std::make_shared( - exec_context, - log->identifier(), - aggregate_context, - std::move(indexes), - std::move(profile_infos)); - return final_convert_event; - } - } - } - return nullptr; } } // namespace DB diff --git a/dbms/src/Flash/tests/gtest_spill_aggregation.cpp b/dbms/src/Flash/tests/gtest_spill_aggregation.cpp index c32bb1a020b..b19aaf03c4c 100644 --- a/dbms/src/Flash/tests/gtest_spill_aggregation.cpp +++ b/dbms/src/Flash/tests/gtest_spill_aggregation.cpp @@ -22,6 +22,7 @@ namespace DB namespace FailPoints { extern const char force_agg_on_partial_block[]; +extern const char force_thread_0_no_agg_spill[]; } // namespace FailPoints namespace tests @@ -47,6 +48,17 @@ class SpillAggregationTestRunner : public DB::tests::ExecutorTest #define WRAP_FOR_AGG_PARTIAL_BLOCK_END } +#define WRAP_FOR_AGG_THREAD_0_NO_SPILL_START \ + for (auto thread_0_no_spill : {true, false}) \ + { \ + if (thread_0_no_spill) \ + FailPointHelper::enableFailPoint(FailPoints::force_thread_0_no_agg_spill); \ + else \ + FailPointHelper::disableFailPoint(FailPoints::force_thread_0_no_agg_spill); + +#define WRAP_FOR_AGG_THREAD_0_NO_SPILL_END } + + #define WRAP_FOR_SPILL_TEST_BEGIN \ std::vector pipeline_bools{false, true}; \ for (auto enable_pipeline : pipeline_bools) \ @@ -103,9 +115,11 @@ try /// don't use `executeAndAssertColumnsEqual` since it takes too long to run /// test single thread aggregation WRAP_FOR_AGG_PARTIAL_BLOCK_START + WRAP_FOR_AGG_THREAD_0_NO_SPILL_START ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, 1)); /// test parallel aggregation ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams)); + WRAP_FOR_AGG_THREAD_0_NO_SPILL_END WRAP_FOR_AGG_PARTIAL_BLOCK_END /// enable spill and use small max_cached_data_bytes_in_spiller context.context->setSetting("max_cached_data_bytes_in_spiller", Field(static_cast(total_data_size / 200))); @@ -249,6 +263,7 @@ try context.context->setSetting("max_block_size", Field(static_cast(max_block_size))); WRAP_FOR_SPILL_TEST_BEGIN WRAP_FOR_AGG_PARTIAL_BLOCK_START + WRAP_FOR_AGG_THREAD_0_NO_SPILL_START auto blocks = getExecuteStreamsReturnBlocks(request, concurrency); for (auto & block : blocks) { @@ -273,6 +288,7 @@ try vstackBlocks(std::move(blocks)).getColumnsWithTypeAndName(), false)); } + WRAP_FOR_AGG_THREAD_0_NO_SPILL_END WRAP_FOR_AGG_PARTIAL_BLOCK_END WRAP_FOR_SPILL_TEST_END } @@ -402,6 +418,7 @@ try context.context->setSetting("max_block_size", Field(static_cast(max_block_size))); WRAP_FOR_SPILL_TEST_BEGIN WRAP_FOR_AGG_PARTIAL_BLOCK_START + WRAP_FOR_AGG_THREAD_0_NO_SPILL_START auto blocks = getExecuteStreamsReturnBlocks(request, concurrency); for (auto & block : blocks) { @@ -426,6 +443,7 @@ try vstackBlocks(std::move(blocks)).getColumnsWithTypeAndName(), false)); } + WRAP_FOR_AGG_THREAD_0_NO_SPILL_END WRAP_FOR_AGG_PARTIAL_BLOCK_END WRAP_FOR_SPILL_TEST_END } diff --git a/dbms/src/Interpreters/AggSpillContext.cpp b/dbms/src/Interpreters/AggSpillContext.cpp index fa307fdd88a..051cadd5ab3 100644 --- a/dbms/src/Interpreters/AggSpillContext.cpp +++ b/dbms/src/Interpreters/AggSpillContext.cpp @@ -21,6 +21,7 @@ namespace DB namespace FailPoints { extern const char random_marked_for_auto_spill[]; +extern const char force_thread_0_no_agg_spill[]; } // namespace FailPoints AggSpillContext::AggSpillContext( @@ -55,6 +56,12 @@ bool AggSpillContext::updatePerThreadRevocableMemory(Int64 new_value, size_t thr if (new_value == 0) // new_value == 0 means no agg data to spill return false; + fiu_do_on(FailPoints::force_thread_0_no_agg_spill, { + if (thread_num == 0) + { + return false; + } + }); if (auto_spill_mode) { AutoSpillStatus old_value = AutoSpillStatus::NEED_AUTO_SPILL; diff --git a/dbms/src/Operators/ConcatSourceOp.h b/dbms/src/Operators/ConcatSourceOp.h index 29f47be70e8..38e35257065 100644 --- a/dbms/src/Operators/ConcatSourceOp.h +++ b/dbms/src/Operators/ConcatSourceOp.h @@ -147,6 +147,12 @@ class ConcatSourceOp : public SourceOp return status; } + void notifyImpl() override + { + assert(cur_exec); + cur_exec->notify(); + } + private: bool popExec() { diff --git a/dbms/src/Operators/Operator.cpp b/dbms/src/Operators/Operator.cpp index 109a339be41..827d349487a 100644 --- a/dbms/src/Operators/Operator.cpp +++ b/dbms/src/Operators/Operator.cpp @@ -88,6 +88,12 @@ OperatorStatus Operator::executeIO() return op_status; } +void Operator::notify() +{ + profile_info.update(); + notifyImpl(); +} + OperatorStatus SourceOp::read(Block & block) { CHECK_IS_CANCELLED diff --git a/dbms/src/Operators/Operator.h b/dbms/src/Operators/Operator.h index d6deec19a02..889fcfa05cf 100644 --- a/dbms/src/Operators/Operator.h +++ b/dbms/src/Operators/Operator.h @@ -66,6 +66,8 @@ class Operator // running status may return are NEED_INPUT and HAS_OUTPUT here. OperatorStatus await(); + void notify(); + // These two methods are used to set state, log and etc, and should not perform calculation logic. void operatePrefix(); void operateSuffix(); @@ -98,6 +100,8 @@ class Operator virtual OperatorStatus awaitImpl() { throw Exception("Unsupport"); } + virtual void notifyImpl() {} + protected: PipelineExecutorContext & exec_context; const LoggerPtr log;