From aa67e0c8b63508525942d907bc1aa51ecb29c91e Mon Sep 17 00:00:00 2001 From: SeaRise Date: Fri, 29 Mar 2024 15:06:49 +0800 Subject: [PATCH] Pipeline: use `notify` instead of `polling` for `UnorderedSourceOp` (#8872) ref pingcap/tiflash#8869 --- dbms/src/Common/TiFlashMetrics.h | 5 +- dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp | 2 +- .../Pipeline/Schedule/Reactor/WaitReactor.cpp | 9 +- .../Pipeline/Schedule/Tasks/NotifyFuture.cpp | 42 ++++++++ .../Pipeline/Schedule/Tasks/NotifyFuture.h | 39 +++++++ .../Schedule/Tasks/PipeConditionVariable.h | 100 ++++++++++++++++++ .../Pipeline/Schedule/Tasks/PipelineTask.h | 4 +- .../Schedule/Tasks/PipelineTaskBase.h | 44 ++++---- .../Flash/Pipeline/Schedule/Tasks/Task.cpp | 26 +++-- dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h | 3 + .../Pipeline/Schedule/Tasks/TaskProfileInfo.h | 11 +- .../Schedule/ThreadPool/TaskThreadPool.cpp | 9 +- dbms/src/Operators/Operator.h | 1 + dbms/src/Operators/OperatorHelper.cpp | 1 + dbms/src/Operators/UnorderedSourceOp.cpp | 43 ++++---- dbms/src/Operators/UnorderedSourceOp.h | 14 +-- .../DeltaMerge/ReadThread/WorkQueue.h | 20 +++- .../Storages/DeltaMerge/SegmentReadTaskPool.h | 9 +- 18 files changed, 313 insertions(+), 69 deletions(-) create mode 100644 dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.cpp create mode 100644 dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.h create mode 100644 dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 4a309eed622..d8237aa9bee 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -742,6 +742,7 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva "pipeline scheduler", \ Gauge, \ F(type_waiting_tasks_count, {"type", "waiting_tasks_count"}), \ + F(type_wait_for_notify_tasks_count, {"type", "wait_for_notify_tasks_count"}), \ F(type_cpu_pending_tasks_count, {"type", "cpu_pending_tasks_count"}), \ F(type_cpu_executing_tasks_count, {"type", "cpu_executing_tasks_count"}), \ F(type_io_pending_tasks_count, {"type", "io_pending_tasks_count"}), \ @@ -755,7 +756,8 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva F(type_io_execute, {{"type", "io_execute"}}, ExpBuckets{0.005, 2, 20}), \ F(type_cpu_queue, {{"type", "cpu_queue"}}, ExpBuckets{0.005, 2, 20}), \ F(type_io_queue, {{"type", "io_queue"}}, ExpBuckets{0.005, 2, 20}), \ - F(type_await, {{"type", "await"}}, ExpBuckets{0.005, 2, 20})) \ + F(type_await, {{"type", "await"}}, ExpBuckets{0.005, 2, 20}), \ + F(type_wait_for_notify, {{"type", "wait_for_notify"}}, ExpBuckets{0.005, 2, 20})) \ M(tiflash_pipeline_task_execute_max_time_seconds_per_round, \ "Bucketed histogram of pipeline task execute max time per round in seconds", \ Histogram, /* these command usually cost several hundred milliseconds to several seconds, increase the start bucket to 5ms */ \ @@ -765,6 +767,7 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva "pipeline task change to status", \ Counter, \ F(type_to_waiting, {"type", "to_waiting"}), \ + F(type_to_wait_for_notify, {"type", "to_wait_for_notify"}), \ F(type_to_running, {"type", "to_running"}), \ F(type_to_io, {"type", "to_io"}), \ F(type_to_finished, {"type", "to_finished"}), \ diff --git a/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp b/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp index b939713e77a..53b1fda15fb 100644 --- a/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp +++ b/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp @@ -41,7 +41,7 @@ extern const char random_pipeline_model_execute_suffix_failpoint[]; case OperatorStatus::WAITING: \ fillAwaitable((op).get()); \ return (op_status); \ - /* For unexpected status, an immediate return is required. */ \ + /* For other status, an immediate return is required. */ \ default: \ return (op_status); \ } diff --git a/dbms/src/Flash/Pipeline/Schedule/Reactor/WaitReactor.cpp b/dbms/src/Flash/Pipeline/Schedule/Reactor/WaitReactor.cpp index b018e72f394..8a3a2910a07 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Reactor/WaitReactor.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Reactor/WaitReactor.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -29,6 +30,7 @@ WaitReactor::WaitReactor(TaskScheduler & scheduler_) : scheduler(scheduler_) { GET_METRIC(tiflash_pipeline_scheduler, type_waiting_tasks_count).Set(0); + GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count).Set(0); thread = std::thread(&WaitReactor::loop, this); } @@ -50,6 +52,9 @@ bool WaitReactor::awaitAndCollectReadyTask(WaitingTask && task) task_ptr->profile_info.elapsedAwaitTime(); io_tasks.push_back(std::move(task.first)); return true; + case ExecTaskStatus::WAIT_FOR_NOTIFY: + registerTaskToFuture(std::move(task.first)); + return true; case FINISH_STATUS: task_ptr->profile_info.elapsedAwaitTime(); task_ptr->startTraceMemory(); @@ -161,8 +166,8 @@ void WaitReactor::react(WaitingTasks & local_waiting_tasks) ++task_it; } -#ifdef __APPLE__ - auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_waiting_tasks_count); +#if __APPLE__ && __clang__ + __thread auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_waiting_tasks_count); #else thread_local auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_waiting_tasks_count); #endif diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.cpp new file mode 100644 index 00000000000..fc939e1eafa --- /dev/null +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.cpp @@ -0,0 +1,42 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB +{ +#if __APPLE__ && __clang__ +__thread NotifyFuturePtr current_notify_future = nullptr; +#else +thread_local NotifyFuturePtr current_notify_future = nullptr; +#endif + +void setNotifyFuture(NotifyFuturePtr new_future) +{ + assert(current_notify_future == nullptr); + current_notify_future = std::move(new_future); +} + +void clearNotifyFuture() +{ + current_notify_future.reset(); +} + +void registerTaskToFuture(TaskPtr && task) +{ + assert(current_notify_future != nullptr); + current_notify_future->registerTask(std::move(task)); + current_notify_future.reset(); +} +} // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.h new file mode 100644 index 00000000000..4b30ed15389 --- /dev/null +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.h @@ -0,0 +1,39 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ +struct NotifyFuture +{ + NotifyFuture() = default; + virtual ~NotifyFuture() = default; + virtual void registerTask(TaskPtr && task) = 0; +}; +using NotifyFuturePtr = std::shared_ptr; + +#if __APPLE__ && __clang__ +extern __thread NotifyFuturePtr current_notify_future; +#else +extern thread_local NotifyFuturePtr current_notify_future; +#endif + +void setNotifyFuture(NotifyFuturePtr new_future); +void clearNotifyFuture(); +void registerTaskToFuture(TaskPtr && task); + +} // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h new file mode 100644 index 00000000000..79b6ccba1ca --- /dev/null +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h @@ -0,0 +1,100 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include + +namespace DB +{ +class PipeConditionVariable +{ +public: + inline void registerTask(TaskPtr && task) + { + assert(task); + assert(task->getStatus() == ExecTaskStatus::WAIT_FOR_NOTIFY); + { + std::lock_guard lock(mu); + tasks.push_back(std::move(task)); + } + +#if __APPLE__ && __clang__ + __thread auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count); +#else + thread_local auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count); +#endif + metrics.Increment(); + } + + inline void notifyOne() + { + TaskPtr task; + { + std::lock_guard lock(mu); + if (tasks.empty()) + return; + task = std::move(tasks.front()); + tasks.pop_front(); + } + assert(task); + notifyTaskDirectly(std::move(task)); + +#if __APPLE__ && __clang__ + __thread auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count); +#else + thread_local auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count); +#endif + metrics.Decrement(); + } + + inline void notifyAll() + { + std::deque cur_tasks; + { + std::lock_guard lock(mu); + std::swap(cur_tasks, tasks); + } + size_t tasks_cnt = cur_tasks.size(); + while (!cur_tasks.empty()) + { + notifyTaskDirectly(std::move(cur_tasks.front())); + cur_tasks.pop_front(); + } + +#if __APPLE__ && __clang__ + __thread auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count); +#else + thread_local auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count); +#endif + metrics.Decrement(tasks_cnt); + } + + static inline void notifyTaskDirectly(TaskPtr && task) + { + assert(task); + task->notify(); + task->profile_info.elapsedWaitForNotifyTime(); + assert(TaskScheduler::instance); + TaskScheduler::instance->submitToCPUTaskThreadPool(std::move(task)); + } + +private: + std::mutex mu; + std::deque tasks; +}; +} // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h index 23f2e4ac9c8..318e56179d4 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h @@ -43,7 +43,9 @@ class PipelineTask void doFinalizeImpl() override { - runFinalize(profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs() + getScheduleDuration()); + runFinalize( + profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs() + + profile_info.getWaitForNotifyTimeNs() + getScheduleDuration()); } }; } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTaskBase.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTaskBase.h index e9b1cd657f9..34274e33880 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTaskBase.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTaskBase.h @@ -29,26 +29,30 @@ namespace DB /// - OperatorStatus::WAITING ==> ExecTaskStatus::WAITING /// - OperatorStatus::NEED_INPUT/HAS_OUTPUT ==> ExecTaskStatus::RUNNING -#define MAP_NOT_RUNNING_TASK_STATUS \ - case OperatorStatus::FINISHED: \ - { \ - return ExecTaskStatus::FINISHED; \ - } \ - case OperatorStatus::CANCELLED: \ - { \ - return ExecTaskStatus::CANCELLED; \ - } \ - case OperatorStatus::IO_IN: \ - { \ - return ExecTaskStatus::IO_IN; \ - } \ - case OperatorStatus::IO_OUT: \ - { \ - return ExecTaskStatus::IO_OUT; \ - } \ - case OperatorStatus::WAITING: \ - { \ - return ExecTaskStatus::WAITING; \ +#define MAP_NOT_RUNNING_TASK_STATUS \ + case OperatorStatus::FINISHED: \ + { \ + return ExecTaskStatus::FINISHED; \ + } \ + case OperatorStatus::CANCELLED: \ + { \ + return ExecTaskStatus::CANCELLED; \ + } \ + case OperatorStatus::IO_IN: \ + { \ + return ExecTaskStatus::IO_IN; \ + } \ + case OperatorStatus::IO_OUT: \ + { \ + return ExecTaskStatus::IO_OUT; \ + } \ + case OperatorStatus::WAITING: \ + { \ + return ExecTaskStatus::WAITING; \ + } \ + case OperatorStatus::WAIT_FOR_NOTIFY: \ + { \ + return ExecTaskStatus::WAIT_FOR_NOTIFY; \ } #define UNEXPECTED_OP_STATUS(op_status, function_name) \ diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp index cf56b1267a1..76a0b1e22db 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -36,12 +37,13 @@ namespace // TODO supports more detailed status transfer metrics, such as from waiting to running. ALWAYS_INLINE void addToStatusMetrics(ExecTaskStatus to) { -#ifdef __APPLE__ -#define M(expect_status, metric_name) \ - case (expect_status): \ - { \ - GET_METRIC(tiflash_pipeline_task_change_to_status, metric_name).Increment(); \ - break; \ +#if __APPLE__ && __clang__ +#define M(expect_status, metric_name) \ + case (expect_status): \ + { \ + __thread auto & metrics_##metric_name = GET_METRIC(tiflash_pipeline_task_change_to_status, metric_name); \ + (metrics_##metric_name).Increment(); \ + break; \ } #else #define M(expect_status, metric_name) \ @@ -56,6 +58,7 @@ ALWAYS_INLINE void addToStatusMetrics(ExecTaskStatus to) switch (to) { M(ExecTaskStatus::WAITING, type_to_waiting) + M(ExecTaskStatus::WAIT_FOR_NOTIFY, type_to_wait_for_notify) M(ExecTaskStatus::RUNNING, type_to_running) M(ExecTaskStatus::IO_IN, type_to_io) M(ExecTaskStatus::IO_OUT, type_to_io) @@ -121,6 +124,7 @@ Task::~Task() catch (...) \ { \ LOG_WARNING(log, "error occurred and cancel the query"); \ + clearNotifyFuture(); \ exec_context.onErrorOccurred(std::current_exception()); \ switchStatus(ExecTaskStatus::ERROR); \ return task_status; \ @@ -128,6 +132,7 @@ Task::~Task() ExecTaskStatus Task::execute() { + assert(current_notify_future == nullptr); assert(mem_tracker_ptr == current_memory_tracker); assert(task_status == ExecTaskStatus::RUNNING); EXECUTE(executeImpl); @@ -135,6 +140,7 @@ ExecTaskStatus Task::execute() ExecTaskStatus Task::executeIO() { + assert(current_notify_future == nullptr); assert(mem_tracker_ptr == current_memory_tracker); assert(task_status == ExecTaskStatus::IO_IN || task_status == ExecTaskStatus::IO_OUT); EXECUTE(executeIOImpl); @@ -142,6 +148,7 @@ ExecTaskStatus Task::executeIO() ExecTaskStatus Task::await() { + assert(current_notify_future == nullptr); // Because await only performs polling checks and does not involve computing/memory tracker memory allocation, // await will not invoke MemoryTracker, so current_memory_tracker must be nullptr here. assert(current_memory_tracker == nullptr); @@ -151,8 +158,15 @@ ExecTaskStatus Task::await() #undef EXECUTE +void Task::notify() +{ + assert(task_status == ExecTaskStatus::WAIT_FOR_NOTIFY); + switchStatus(ExecTaskStatus::RUNNING); +} + void Task::finalize() { + assert(current_notify_future == nullptr); // To make sure that `finalize` only called once. RUNTIME_ASSERT(!is_finalized, log, "finalize can only be called once."); is_finalized = true; diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h index 57c8bf2f408..1681eb45e1b 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h @@ -35,6 +35,7 @@ namespace DB */ enum class ExecTaskStatus { + WAIT_FOR_NOTIFY, WAITING, RUNNING, IO_IN, @@ -67,6 +68,8 @@ class Task ExecTaskStatus await(); + void notify(); + // `finalize` must be called before destructuring. // `TaskHelper::FINALIZE_TASK` can help this. void finalize(); diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/TaskProfileInfo.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/TaskProfileInfo.h index b431e42a0e5..27cf25f9b1a 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/TaskProfileInfo.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/TaskProfileInfo.h @@ -31,16 +31,18 @@ class ProfileInfo ALWAYS_INLINE UInt64 getIOExecuteTimeNs() const { return io_execute_time_ns; } ALWAYS_INLINE UInt64 getIOPendingTimeNs() const { return io_pending_time_ns; } ALWAYS_INLINE UInt64 getAwaitTimeNs() const { return await_time_ns; } + ALWAYS_INLINE UInt64 getWaitForNotifyTimeNs() const { return wait_for_notify_time_ns; } ALWAYS_INLINE String toJson() const { return fmt::format( - R"({{"cpu_execute_time_ns":{},"cpu_pending_time_ns":{},"io_execute_time_ns":{},"io_pending_time_ns":{},"await_time_ns":{}}})", + R"({{"cpu_execute_time_ns":{},"cpu_pending_time_ns":{},"io_execute_time_ns":{},"io_pending_time_ns":{},"await_time_ns":{},"wait_for_notify_time_ns":{}}})", cpu_execute_time_ns, cpu_pending_time_ns, io_execute_time_ns, io_pending_time_ns, - await_time_ns); + await_time_ns, + wait_for_notify_time_ns); } protected: @@ -49,6 +51,7 @@ class ProfileInfo UnitType io_execute_time_ns = 0; UnitType io_pending_time_ns = 0; UnitType await_time_ns = 0; + UnitType wait_for_notify_time_ns = 0; }; class TaskProfileInfo : public ProfileInfo @@ -78,6 +81,8 @@ class TaskProfileInfo : public ProfileInfo ALWAYS_INLINE void elapsedAwaitTime() { await_time_ns += elapsedFromPrev(); } + ALWAYS_INLINE void elapsedWaitForNotifyTime() { wait_for_notify_time_ns += elapsedFromPrev(); } + ALWAYS_INLINE void reportMetrics() const { #ifdef __APPLE__ @@ -111,6 +116,7 @@ class TaskProfileInfo : public ProfileInfo REPORT_DURATION_METRICS(type_io_execute, io_execute_time_ns); REPORT_DURATION_METRICS(type_io_queue, io_pending_time_ns); REPORT_DURATION_METRICS(type_await, await_time_ns); + REPORT_DURATION_METRICS(type_wait_for_notify, wait_for_notify_time_ns); REPORT_ROUND_METRICS(type_cpu, cpu_execute_max_time_ns_per_round); REPORT_ROUND_METRICS(type_io, io_execute_max_time_ns_per_round); @@ -136,6 +142,7 @@ class QueryProfileInfo : public ProfileInfo io_execute_time_ns += task_profile_info.getIOExecuteTimeNs(); io_pending_time_ns += task_profile_info.getIOPendingTimeNs(); await_time_ns += task_profile_info.getAwaitTimeNs(); + wait_for_notify_time_ns += task_profile_info.getWaitForNotifyTimeNs(); } }; } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/ThreadPool/TaskThreadPool.cpp b/dbms/src/Flash/Pipeline/Schedule/ThreadPool/TaskThreadPool.cpp index 2e645fca2c3..b379ef633ae 100644 --- a/dbms/src/Flash/Pipeline/Schedule/ThreadPool/TaskThreadPool.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/ThreadPool/TaskThreadPool.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -95,8 +96,8 @@ void TaskThreadPool::handleTask(TaskPtr & task) metrics.incExecutingTask(); metrics.elapsedPendingTime(task); - ExecTaskStatus status_before_exec = task->getStatus(); - ExecTaskStatus status_after_exec = status_before_exec; + auto status_before_exec = task->getStatus(); + auto status_after_exec = status_before_exec; UInt64 total_time_spent = 0; while (true) { @@ -124,6 +125,10 @@ void TaskThreadPool::handleTask(TaskPtr & task) task->endTraceMemory(); scheduler.submitToWaitReactor(std::move(task)); break; + case ExecTaskStatus::WAIT_FOR_NOTIFY: + task->endTraceMemory(); + registerTaskToFuture(std::move(task)); + break; case FINISH_STATUS: task->finalize(); task->endTraceMemory(); diff --git a/dbms/src/Operators/Operator.h b/dbms/src/Operators/Operator.h index 57ee92263db..d6deec19a02 100644 --- a/dbms/src/Operators/Operator.h +++ b/dbms/src/Operators/Operator.h @@ -37,6 +37,7 @@ enum class OperatorStatus CANCELLED, /// waiting status WAITING, + WAIT_FOR_NOTIFY, /// io status IO_IN, IO_OUT, diff --git a/dbms/src/Operators/OperatorHelper.cpp b/dbms/src/Operators/OperatorHelper.cpp index 814e0ac6547..d266bd5124f 100644 --- a/dbms/src/Operators/OperatorHelper.cpp +++ b/dbms/src/Operators/OperatorHelper.cpp @@ -26,6 +26,7 @@ void assertOperatorStatus(OperatorStatus status, std::initializer_list #include -#include - namespace DB { -OperatorStatus UnorderedSourceOp::readImpl(Block & block) +UnorderedSourceOp::UnorderedSourceOp( + PipelineExecutorContext & exec_context_, + const DM::SegmentReadTaskPoolPtr & task_pool_, + const DM::ColumnDefines & columns_to_read_, + int extra_table_id_index_, + const String & req_id, + const RuntimeFilteList & runtime_filter_list_, + int max_wait_time_ms_) + : SourceOp(exec_context_, req_id) + , task_pool(task_pool_) + , ref_no(0) + , waiting_rf_list(runtime_filter_list_) + , max_wait_time_ms(max_wait_time_ms_) { - if unlikely (done) - return OperatorStatus::HAS_OUTPUT; - - auto await_status = awaitImpl(); - if (await_status == OperatorStatus::HAS_OUTPUT) - std::swap(block, t_block); - return await_status; + setHeader(AddExtraTableIDColumnTransformAction::buildHeader(columns_to_read_, extra_table_id_index_)); + ref_no = task_pool->increaseUnorderedInputStreamRefCount(); } -OperatorStatus UnorderedSourceOp::awaitImpl() +OperatorStatus UnorderedSourceOp::readImpl(Block & block) { if unlikely (done) return OperatorStatus::HAS_OUTPUT; - if unlikely (t_block) - return OperatorStatus::HAS_OUTPUT; while (true) { - if (!task_pool->tryPopBlock(t_block)) - return OperatorStatus::WAITING; - if (t_block) + if (!task_pool->tryPopBlock(block)) + { + setNotifyFuture(task_pool); + return OperatorStatus::WAIT_FOR_NOTIFY; + } + + if (block) { - if unlikely (t_block.rows() == 0) + if unlikely (block.rows() == 0) { - t_block.clear(); + block.clear(); continue; } return OperatorStatus::HAS_OUTPUT; diff --git a/dbms/src/Operators/UnorderedSourceOp.h b/dbms/src/Operators/UnorderedSourceOp.h index 963ff9adefe..65f0bb68550 100644 --- a/dbms/src/Operators/UnorderedSourceOp.h +++ b/dbms/src/Operators/UnorderedSourceOp.h @@ -23,7 +23,6 @@ namespace DB { - /// Read blocks asyncly from Storage Layer by using read thread, /// The result can not guarantee the keep_order property class UnorderedSourceOp : public SourceOp @@ -36,16 +35,7 @@ class UnorderedSourceOp : public SourceOp int extra_table_id_index_, const String & req_id, const RuntimeFilteList & runtime_filter_list_ = std::vector{}, - int max_wait_time_ms_ = 0) - : SourceOp(exec_context_, req_id) - , task_pool(task_pool_) - , ref_no(0) - , waiting_rf_list(runtime_filter_list_) - , max_wait_time_ms(max_wait_time_ms_) - { - setHeader(AddExtraTableIDColumnTransformAction::buildHeader(columns_to_read_, extra_table_id_index_)); - ref_no = task_pool->increaseUnorderedInputStreamRefCount(); - } + int max_wait_time_ms_ = 0); ~UnorderedSourceOp() override { @@ -77,7 +67,6 @@ class UnorderedSourceOp : public SourceOp void operatePrefixImpl() override; OperatorStatus readImpl(Block & block) override; - OperatorStatus awaitImpl() override; private: DM::SegmentReadTaskPoolPtr task_pool; @@ -88,6 +77,5 @@ class UnorderedSourceOp : public SourceOp int max_wait_time_ms; bool done = false; - Block t_block; }; } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h b/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h index 1b7f3bfd603..039bb9f4ffb 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h @@ -13,6 +13,7 @@ // limitations under the License. #pragma once +#include #include #include @@ -30,6 +31,7 @@ class WorkQueue std::condition_variable reader_cv; std::condition_variable writer_cv; std::condition_variable finish_cv; + PipeConditionVariable pipe_cv; std::queue queue; bool done; std::size_t max_size; @@ -61,6 +63,20 @@ class WorkQueue , pop_times(0) , pop_empty_times(0) {} + + void registerPipeTask(TaskPtr && task) + { + { + std::lock_guard lock(mu); + if (queue.empty() && !done) + { + pipe_cv.registerTask(std::move(task)); + return; + } + } + PipeConditionVariable::notifyTaskDirectly(std::move(task)); + } + /** * Push an item onto the work queue. Notify a single thread that work is * available. If `finish()` has been called, do nothing and return false. @@ -90,6 +106,7 @@ class WorkQueue *size = queue.size(); } } + pipe_cv.notifyOne(); reader_cv.notify_one(); return true; } @@ -136,7 +153,7 @@ class WorkQueue bool tryPop(T & item) { { - std::unique_lock lock(mu); + std::lock_guard lock(mu); ++pop_times; if (queue.empty()) { @@ -182,6 +199,7 @@ class WorkQueue assert(!done); done = true; } + pipe_cv.notifyAll(); reader_cv.notify_all(); writer_cv.notify_all(); finish_cv.notify_all(); diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index a695fd9846a..dfcddc1eed4 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -14,6 +14,7 @@ #pragma once #include +#include #include #include #include @@ -97,7 +98,9 @@ class SegmentReadTasksWrapper std::unordered_map unordered_tasks; }; -class SegmentReadTaskPool : private boost::noncopyable +class SegmentReadTaskPool + : public NotifyFuture + , private boost::noncopyable { public: SegmentReadTaskPool( @@ -167,6 +170,8 @@ class SegmentReadTaskPool : private boost::noncopyable std::once_flag & addToSchedulerFlag() { return add_to_scheduler; } + void registerTask(TaskPtr && task) override { q.registerPipeTask(std::move(task)); } + public: const uint64_t pool_id; @@ -175,7 +180,7 @@ class SegmentReadTaskPool : private boost::noncopyable ColumnDefines & getColumnToRead() { return columns_to_read; } - void appendRSOperator(RSOperatorPtr & new_filter) + void appendRSOperator(RSOperatorPtr & new_filter) const { if (filter->rs_operator == DM::EMPTY_RS_OPERATOR) {