From f3a672782a24105c19701a6f30697e5d39920e72 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Tue, 4 Apr 2023 17:50:57 +0800 Subject: [PATCH 1/4] Compatible new field in write/lock cf with TiKV (#7214) close pingcap/tiflash#7212 --- .../Storages/Transaction/RegionCFDataBase.cpp | 7 +++++++ .../Storages/Transaction/TiKVRecordFormat.h | 18 ++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/dbms/src/Storages/Transaction/RegionCFDataBase.cpp b/dbms/src/Storages/Transaction/RegionCFDataBase.cpp index b5079c9b554..8a40a5aa795 100644 --- a/dbms/src/Storages/Transaction/RegionCFDataBase.cpp +++ b/dbms/src/Storages/Transaction/RegionCFDataBase.cpp @@ -355,6 +355,13 @@ inline void decodeLockCfValue(DecodedLockCFValue & res) UNUSED(versions_to_last_change); break; } + case TXN_SOURCE_PREFIX_FOR_LOCK: + { + // Used for CDC, useless for TiFlash. + UInt64 txn_source_prefic = readVarUInt(data, len); + UNUSED(txn_source_prefic); + break; + } default: { std::string msg = std::string("invalid flag ") + flag + " in lock value " + value.toDebugString(); diff --git a/dbms/src/Storages/Transaction/TiKVRecordFormat.h b/dbms/src/Storages/Transaction/TiKVRecordFormat.h index 19bec5be53a..37dcd77b014 100644 --- a/dbms/src/Storages/Transaction/TiKVRecordFormat.h +++ b/dbms/src/Storages/Transaction/TiKVRecordFormat.h @@ -66,6 +66,8 @@ static const char ROLLBACK_TS_PREFIX = 'r'; static const char FLAG_OVERLAPPED_ROLLBACK = 'R'; static const char GC_FENCE_PREFIX = 'F'; static const char LAST_CHANGE_PREFIX = 'l'; +static const char TXN_SOURCE_PREFIX_FOR_WRITE = 'S'; +static const char TXN_SOURCE_PREFIX_FOR_LOCK = 's'; static const size_t SHORT_VALUE_MAX_LEN = 64; @@ -427,6 +429,22 @@ inline DecodedWriteCFValue decodeWriteCfValue(const TiKVValue & value) * rewriting record and there must be a complete row written to tikv, just ignore it in tiflash. */ return std::nullopt; + case RecordKVFormat::LAST_CHANGE_PREFIX: + { + // Used to accelerate TiKV MVCC scan, useless for TiFlash. + UInt64 last_change_ts = readUInt64(data, len); + UInt64 versions_to_last_change = readVarUInt(data, len); + UNUSED(last_change_ts); + UNUSED(versions_to_last_change); + break; + } + case RecordKVFormat::TXN_SOURCE_PREFIX_FOR_WRITE: + { + // Used for CDC, useless for TiFlash. + UInt64 txn_source_prefic = readVarUInt(data, len); + UNUSED(txn_source_prefic); + break; + } default: throw Exception("invalid flag " + std::to_string(flag) + " in write cf", ErrorCodes::LOGICAL_ERROR); } From 2867b01a475257b1369580e968a13abec4978feb Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 6 Apr 2023 10:54:57 +0800 Subject: [PATCH 2/4] Pipeline: refine result handler for batch cop (#7175) ref pingcap/tiflash#6518 --- .../PtrHolder.h} | 28 ++++---- .../WNEstablishDisaggTaskHandler.cpp | 1 - .../WNEstablishDisaggTaskHandler.h | 2 +- dbms/src/Flash/Executor/PipelineExecutor.cpp | 67 ++++++++++++++----- dbms/src/Flash/Executor/PipelineExecutor.h | 8 +++ .../Flash/Executor/PipelineExecutorStatus.cpp | 25 +++++-- .../Flash/Executor/PipelineExecutorStatus.h | 10 ++- dbms/src/Flash/Executor/QueryExecutor.h | 2 + dbms/src/Flash/Executor/ResultQueue.h | 27 ++++++++ dbms/src/Flash/Mpp/MPPTask.cpp | 12 +++- dbms/src/Flash/Mpp/MPPTask.h | 2 +- .../Exec/tests/gtest_simple_operator.cpp | 42 +++++++++++- dbms/src/Flash/Pipeline/Pipeline.cpp | 4 +- dbms/src/Flash/Pipeline/Pipeline.h | 3 +- .../Planner/Plans/PhysicalGetResultSink.cpp | 7 +- .../Planner/Plans/PhysicalGetResultSink.h | 17 ++--- dbms/src/Operators/GetResultSinkOp.cpp | 41 ++++++++++-- dbms/src/Operators/GetResultSinkOp.h | 17 +++-- 18 files changed, 243 insertions(+), 72 deletions(-) rename dbms/src/{Flash/Executor/QueryExecutorHolder.h => Common/PtrHolder.h} (61%) create mode 100644 dbms/src/Flash/Executor/ResultQueue.h diff --git a/dbms/src/Flash/Executor/QueryExecutorHolder.h b/dbms/src/Common/PtrHolder.h similarity index 61% rename from dbms/src/Flash/Executor/QueryExecutorHolder.h rename to dbms/src/Common/PtrHolder.h index bc3b050fa64..9be26f86b69 100644 --- a/dbms/src/Flash/Executor/QueryExecutorHolder.h +++ b/dbms/src/Common/PtrHolder.h @@ -14,40 +14,40 @@ #pragma once -#include - #include namespace DB { -class QueryExecutorHolder +template +class PtrHolder { public: - void set(QueryExecutorPtr && query_executor_) + void set(Ptr && obj_) { + assert(obj_); std::lock_guard lock(mu); - assert(!query_executor); - query_executor = std::move(query_executor_); + assert(!obj); + obj = std::move(obj_); } - std::optional tryGet() + auto tryGet() { - std::optional res; + std::optional res; std::lock_guard lock(mu); - if (query_executor != nullptr) - res.emplace(query_executor.get()); + if (obj != nullptr) + res.emplace(obj.get()); return res; } - QueryExecutor * operator->() + auto operator->() { std::lock_guard lock(mu); - assert(query_executor != nullptr); - return query_executor.get(); + assert(obj != nullptr); + return obj.get(); } private: std::mutex mu; - QueryExecutorPtr query_executor; + Ptr obj; }; } // namespace DB diff --git a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp index 22b3ee809bf..4e2d34f81ee 100644 --- a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp +++ b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include #include #include diff --git a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.h b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.h index 88d0c13cbc8..dd89398da34 100644 --- a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.h +++ b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.h @@ -14,7 +14,7 @@ #pragma once -#include +#include #include #include #include diff --git a/dbms/src/Flash/Executor/PipelineExecutor.cpp b/dbms/src/Flash/Executor/PipelineExecutor.cpp index 524624dedd7..ae64d1c7c04 100644 --- a/dbms/src/Flash/Executor/PipelineExecutor.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutor.cpp @@ -31,36 +31,71 @@ PipelineExecutor::PipelineExecutor( assert(root_pipeline); } -ExecutionResult PipelineExecutor::execute(ResultHandler && result_handler) +void PipelineExecutor::scheduleEvents() { assert(root_pipeline); - // for !result_handler.isIgnored(), the sink plan of root_pipeline must be nullptr. - // TODO Now the result handler for batch cop introduces io blocking, we should find a better implementation of get result sink. - if (unlikely(!result_handler.isIgnored())) - root_pipeline->addGetResultSink(std::move(result_handler)); - + auto events = root_pipeline->toEvents(status, context, context.getMaxStreams()); + Events without_input_events; + for (const auto & event : events) { - auto events = root_pipeline->toEvents(status, context, context.getMaxStreams()); - Events without_input_events; - for (const auto & event : events) - { - if (event->withoutInput()) - without_input_events.push_back(event); - } - for (const auto & event : without_input_events) - event->schedule(); + if (event->withoutInput()) + without_input_events.push_back(event); } + for (const auto & event : without_input_events) + event->schedule(); +} +void PipelineExecutor::wait() +{ if (unlikely(context.isTest())) { // In test mode, a single query should take no more than 15 seconds to execute. - std::chrono::seconds timeout(15); + static std::chrono::seconds timeout(15); status.waitFor(timeout); } else { status.wait(); } +} + +void PipelineExecutor::consume(const ResultQueuePtr & result_queue, ResultHandler && result_handler) +{ + Block ret; + if (unlikely(context.isTest())) + { + // In test mode, a single query should take no more than 15 seconds to execute. + static std::chrono::seconds timeout(15); + while (result_queue->popTimeout(ret, timeout) == MPMCQueueResult::OK) + result_handler(ret); + } + else + { + while (result_queue->pop(ret) == MPMCQueueResult::OK) + result_handler(ret); + } +} + +ExecutionResult PipelineExecutor::execute(ResultHandler && result_handler) +{ + if (result_handler.isIgnored()) + { + scheduleEvents(); + wait(); + } + else + { + /// ┌──get_result_sink + /// result_handler◄──result_queue◄──┼──get_result_sink + /// └──get_result_sink + + // The queue size is same as UnionBlockInputStream = concurrency * 5. + auto result_queue = status.registerResultQueue(/*queue_size=*/context.getMaxStreams() * 5); + assert(root_pipeline); + root_pipeline->addGetResultSink(result_queue); + scheduleEvents(); + consume(result_queue, std::move(result_handler)); + } return status.toExecutionResult(); } diff --git a/dbms/src/Flash/Executor/PipelineExecutor.h b/dbms/src/Flash/Executor/PipelineExecutor.h index 9ccd352db6b..cbc4f46f295 100644 --- a/dbms/src/Flash/Executor/PipelineExecutor.h +++ b/dbms/src/Flash/Executor/PipelineExecutor.h @@ -16,6 +16,7 @@ #include #include +#include namespace DB { @@ -69,6 +70,13 @@ class PipelineExecutor : public QueryExecutor protected: ExecutionResult execute(ResultHandler && result_handler) override; +private: + void scheduleEvents(); + + void wait(); + + void consume(const ResultQueuePtr & result_queue, ResultHandler && result_handler); + private: PipelinePtr root_pipeline; diff --git a/dbms/src/Flash/Executor/PipelineExecutorStatus.cpp b/dbms/src/Flash/Executor/PipelineExecutorStatus.cpp index 471f9e96393..8f1adeee1ea 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorStatus.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutorStatus.cpp @@ -92,15 +92,30 @@ void PipelineExecutorStatus::onEventSchedule() noexcept void PipelineExecutorStatus::onEventFinish() noexcept { - std::lock_guard lock(mu); - assert(active_event_count > 0); - --active_event_count; - if (0 == active_event_count) - cv.notify_all(); + bool query_finished = false; + { + std::lock_guard lock(mu); + assert(active_event_count > 0); + --active_event_count; + if (0 == active_event_count) + { + cv.notify_all(); + query_finished = true; + } + } + if (query_finished && result_queue.has_value()) + (*result_queue)->finish(); } void PipelineExecutorStatus::cancel() noexcept { is_cancelled.store(true, std::memory_order_release); } + +ResultQueuePtr PipelineExecutorStatus::registerResultQueue(size_t queue_size) noexcept +{ + assert(!result_queue.has_value()); + result_queue.emplace(std::make_shared(queue_size)); + return *result_queue; +} } // namespace DB diff --git a/dbms/src/Flash/Executor/PipelineExecutorStatus.h b/dbms/src/Flash/Executor/PipelineExecutorStatus.h index 0892751acf4..0a51457ce8f 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorStatus.h +++ b/dbms/src/Flash/Executor/PipelineExecutorStatus.h @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -69,11 +70,13 @@ class PipelineExecutorStatus : private boost::noncopyable void cancel() noexcept; - bool isCancelled() noexcept + ALWAYS_INLINE bool isCancelled() noexcept { return is_cancelled.load(std::memory_order_acquire); } + ResultQueuePtr registerResultQueue(size_t queue_size) noexcept; + private: bool setExceptionPtr(const std::exception_ptr & exception_ptr_) noexcept; @@ -86,5 +89,10 @@ class PipelineExecutorStatus : private boost::noncopyable UInt32 active_event_count{0}; std::atomic_bool is_cancelled{false}; + + // `result_queue.finish` can only be called in `onEventFinish` because `result_queue.pop` cannot end until events end. + // `registerResultQueue` is called before event scheduled, so is safe to use result_queue without lock. + // If `registerResultQueue` is called, `result_queue` must be safely visible in `onEventFinish`. + std::optional result_queue; }; } // namespace DB diff --git a/dbms/src/Flash/Executor/QueryExecutor.h b/dbms/src/Flash/Executor/QueryExecutor.h index 9c90d2e968d..2a7f830682f 100644 --- a/dbms/src/Flash/Executor/QueryExecutor.h +++ b/dbms/src/Flash/Executor/QueryExecutor.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include #include #include @@ -69,4 +70,5 @@ class QueryExecutor }; using QueryExecutorPtr = std::unique_ptr; +using QueryExecutorHolder = PtrHolder; } // namespace DB diff --git a/dbms/src/Flash/Executor/ResultQueue.h b/dbms/src/Flash/Executor/ResultQueue.h new file mode 100644 index 00000000000..faa6347640e --- /dev/null +++ b/dbms/src/Flash/Executor/ResultQueue.h @@ -0,0 +1,27 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include + +namespace DB +{ +using ResultQueue = MPMCQueue; +using ResultQueuePtr = std::shared_ptr; +} // namespace DB diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 5752159f618..8624354aa80 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -405,10 +405,18 @@ void MPPTask::runImpl() auto result = query_executor_holder->execute(); if (likely(result.is_success)) { - // finish receiver - receiver_set->close(); + /// Need to finish writing before closing the receiver. + /// For example, for the query with limit, calling `finishWrite` first to ensure that the limit executor on the TiDB side can end normally, + /// otherwise the upstream MPPTasks will fail because of the closed receiver and then passing the error to TiDB. + /// + /// ┌──tiflash(limit)◄─┬─tiflash(no limit) + /// tidb(limit)◄──┼──tiflash(limit)◄─┼─tiflash(no limit) + /// └──tiflash(limit)◄─┴─tiflash(no limit) + // finish MPPTunnel finishWrite(); + // finish receiver + receiver_set->close(); } auto ru = query_executor_holder->collectRequestUnit(); LOG_INFO(log, "mpp finish with request unit: {}", ru); diff --git a/dbms/src/Flash/Mpp/MPPTask.h b/dbms/src/Flash/Mpp/MPPTask.h index 302b64fff87..21049644a3e 100644 --- a/dbms/src/Flash/Mpp/MPPTask.h +++ b/dbms/src/Flash/Mpp/MPPTask.h @@ -16,7 +16,7 @@ #include #include -#include +#include #include #include #include diff --git a/dbms/src/Flash/Pipeline/Exec/tests/gtest_simple_operator.cpp b/dbms/src/Flash/Pipeline/Exec/tests/gtest_simple_operator.cpp index 7bcaf23210a..8bfb0abf06a 100644 --- a/dbms/src/Flash/Pipeline/Exec/tests/gtest_simple_operator.cpp +++ b/dbms/src/Flash/Pipeline/Exec/tests/gtest_simple_operator.cpp @@ -23,6 +23,41 @@ namespace DB::tests { +namespace +{ +class SimpleGetResultSinkOp : public SinkOp +{ +public: + SimpleGetResultSinkOp( + PipelineExecutorStatus & exec_status_, + const String & req_id, + ResultHandler result_handler_) + : SinkOp(exec_status_, req_id) + , result_handler(std::move(result_handler_)) + { + assert(!result_handler.isIgnored()); + } + + String getName() const override + { + return "SimpleGetResultSinkOp"; + } + +protected: + OperatorStatus writeImpl(Block && block) override + { + if (!block) + return OperatorStatus::FINISHED; + + result_handler(block); + return OperatorStatus::NEED_INPUT; + } + +private: + ResultHandler result_handler; +}; +} // namespace + class SimpleOperatorTestRunner : public DB::tests::ExecutorTest { public: @@ -59,14 +94,17 @@ class SimpleOperatorTestRunner : public DB::tests::ExecutorTest PhysicalPlan physical_plan{*context.context, ""}; physical_plan.build(request.get()); - assert(!result_handler.isIgnored()); - auto plan_tree = PhysicalGetResultSink::build(std::move(result_handler), Logger::get(), physical_plan.outputAndOptimize()); + auto plan_tree = physical_plan.outputAndOptimize(); PipelineExecGroupBuilder group_builder; PhysicalPlanVisitor::visitPostOrder(plan_tree, [&](const PhysicalPlanNodePtr & plan) { assert(plan); plan->buildPipelineExecGroup(exec_status, group_builder, *context.context, /*concurrency=*/1); }); + assert(group_builder.concurrency == 1); + group_builder.transform([&](auto & builder) { + builder.setSinkOp(std::make_unique(exec_status, "", result_handler)); + }); auto result = group_builder.build(); assert(result.size() == 1); return std::move(result.back()); diff --git a/dbms/src/Flash/Pipeline/Pipeline.cpp b/dbms/src/Flash/Pipeline/Pipeline.cpp index 21a4e0cb655..de8af9d7c76 100644 --- a/dbms/src/Flash/Pipeline/Pipeline.cpp +++ b/dbms/src/Flash/Pipeline/Pipeline.cpp @@ -153,10 +153,10 @@ void Pipeline::toTreeString(FmtBuffer & buffer, size_t level) const child->toTreeString(buffer, level); } -void Pipeline::addGetResultSink(ResultHandler && result_handler) +void Pipeline::addGetResultSink(const ResultQueuePtr & result_queue) { assert(!plan_nodes.empty()); - auto get_result_sink = PhysicalGetResultSink::build(std::move(result_handler), log, plan_nodes.back()); + auto get_result_sink = PhysicalGetResultSink::build(result_queue, log, plan_nodes.back()); addPlanNode(get_result_sink); } diff --git a/dbms/src/Flash/Pipeline/Pipeline.h b/dbms/src/Flash/Pipeline/Pipeline.h index 2f9a4a394bf..24cc3dd7910 100644 --- a/dbms/src/Flash/Pipeline/Pipeline.h +++ b/dbms/src/Flash/Pipeline/Pipeline.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include #include @@ -65,7 +66,7 @@ class Pipeline : public std::enable_shared_from_this void toTreeString(FmtBuffer & buffer, size_t level = 0) const; // used for getting the result blocks. - void addGetResultSink(ResultHandler && result_handler); + void addGetResultSink(const ResultQueuePtr & result_queue); PipelineExecGroup buildExecGroup(PipelineExecutorStatus & exec_status, Context & context, size_t concurrency); diff --git a/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.cpp b/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.cpp index d7a81162a01..32fc803a30c 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.cpp @@ -19,7 +19,7 @@ namespace DB { PhysicalPlanNodePtr PhysicalGetResultSink::build( - ResultHandler && result_handler, + const ResultQueuePtr & result_queue, const LoggerPtr & log, const PhysicalPlanNodePtr & child) { @@ -29,7 +29,7 @@ PhysicalPlanNodePtr PhysicalGetResultSink::build( child->getFineGrainedShuffle(), log->identifier(), child, - std::move(result_handler)); + result_queue); } void PhysicalGetResultSink::buildPipelineExecGroup( @@ -38,9 +38,8 @@ void PhysicalGetResultSink::buildPipelineExecGroup( Context & /*context*/, size_t /*concurrency*/) { - auto this_shared_ptr = std::static_pointer_cast(shared_from_this()); group_builder.transform([&](auto & builder) { - builder.setSinkOp(std::make_unique(exec_status, log->identifier(), this_shared_ptr)); + builder.setSinkOp(std::make_unique(exec_status, log->identifier(), result_queue)); }); } } // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.h b/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.h index 5e17d2ce269..096bf4ead66 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.h @@ -14,18 +14,16 @@ #pragma once -#include +#include #include namespace DB { -class GetResultSinkOp; - class PhysicalGetResultSink : public PhysicalUnary { public: static PhysicalPlanNodePtr build( - ResultHandler && result_handler, + const ResultQueuePtr & result_queue, const LoggerPtr & log, const PhysicalPlanNodePtr & child); @@ -35,11 +33,11 @@ class PhysicalGetResultSink : public PhysicalUnary const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id, const PhysicalPlanNodePtr & child_, - ResultHandler && result_handler_) + const ResultQueuePtr & result_queue_) : PhysicalUnary(executor_id_, PlanType::GetResult, schema_, fine_grained_shuffle_, req_id, child_) - , result_handler(std::move(result_handler_)) + , result_queue(result_queue_) { - assert(!result_handler.isIgnored()); + assert(result_queue); } void finalize(const Names &) override @@ -59,10 +57,7 @@ class PhysicalGetResultSink : public PhysicalUnary size_t /*concurrency*/) override; private: - friend class GetResultSinkOp; - - std::mutex mu; - ResultHandler result_handler; + ResultQueuePtr result_queue; private: void buildBlockInputStreamImpl(DAGPipeline &, Context &, size_t) override diff --git a/dbms/src/Operators/GetResultSinkOp.cpp b/dbms/src/Operators/GetResultSinkOp.cpp index d152a3511da..ce4e2f022f2 100644 --- a/dbms/src/Operators/GetResultSinkOp.cpp +++ b/dbms/src/Operators/GetResultSinkOp.cpp @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include namespace DB @@ -22,8 +21,42 @@ OperatorStatus GetResultSinkOp::writeImpl(Block && block) if (!block) return OperatorStatus::FINISHED; - std::lock_guard lock(physical_sink->mu); - physical_sink->result_handler(block); - return OperatorStatus::NEED_INPUT; + assert(!t_block); + auto ret = result_queue->tryPush(std::move(block)); + switch (ret) + { + case MPMCQueueResult::OK: + return OperatorStatus::NEED_INPUT; + case MPMCQueueResult::FULL: + // If returning Full, the block was not actually moved. + assert(block); // NOLINT(bugprone-use-after-move) + t_block.emplace(std::move(block)); // NOLINT(bugprone-use-after-move) + return OperatorStatus::WAITING; + default: + return OperatorStatus::FINISHED; + } +} + +OperatorStatus GetResultSinkOp::prepareImpl() +{ + return awaitImpl(); +} + +OperatorStatus GetResultSinkOp::awaitImpl() +{ + if (!t_block) + return OperatorStatus::NEED_INPUT; + + auto ret = result_queue->tryPush(std::move(*t_block)); + switch (ret) + { + case MPMCQueueResult::OK: + t_block.reset(); + return OperatorStatus::NEED_INPUT; + case MPMCQueueResult::FULL: + return OperatorStatus::WAITING; + default: + return OperatorStatus::FINISHED; + } } } // namespace DB diff --git a/dbms/src/Operators/GetResultSinkOp.h b/dbms/src/Operators/GetResultSinkOp.h index d5f4aab4dac..51fe96ee978 100644 --- a/dbms/src/Operators/GetResultSinkOp.h +++ b/dbms/src/Operators/GetResultSinkOp.h @@ -14,13 +14,11 @@ #pragma once -#include +#include #include namespace DB { -class PhysicalGetResultSink; -using PhysicalGetResultSinkPtr = std::shared_ptr; // The sink operator for getting the execution results. class GetResultSinkOp : public SinkOp { @@ -28,11 +26,11 @@ class GetResultSinkOp : public SinkOp GetResultSinkOp( PipelineExecutorStatus & exec_status_, const String & req_id, - const PhysicalGetResultSinkPtr & physical_sink_) + const ResultQueuePtr & result_queue_) : SinkOp(exec_status_, req_id) - , physical_sink(physical_sink_) + , result_queue(result_queue_) { - assert(physical_sink); + assert(result_queue); } String getName() const override @@ -43,7 +41,12 @@ class GetResultSinkOp : public SinkOp protected: OperatorStatus writeImpl(Block && block) override; + OperatorStatus prepareImpl() override; + + OperatorStatus awaitImpl() override; + private: - PhysicalGetResultSinkPtr physical_sink; + ResultQueuePtr result_queue; + std::optional t_block; }; } // namespace DB From 23c43aca58f86311bcb011a95230e8503cee1219 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Thu, 6 Apr 2023 13:46:57 +0800 Subject: [PATCH 3/4] Use thread pool to optimize loadMetadata and initStores in tiflash starts (#6979) ref pingcap/tiflash#7188 --- dbms/src/Common/UniThreadPool.h | 79 +++++++++++++ dbms/src/Databases/DatabaseMemory.cpp | 2 +- dbms/src/Databases/DatabaseMemory.h | 2 +- dbms/src/Databases/DatabaseOrdinary.cpp | 51 +++++++-- dbms/src/Databases/DatabaseOrdinary.h | 2 +- dbms/src/Databases/DatabaseTiFlash.cpp | 45 ++++++-- dbms/src/Databases/DatabaseTiFlash.h | 2 +- dbms/src/Databases/DatabasesCommon.cpp | 105 ++---------------- dbms/src/Databases/DatabasesCommon.h | 22 ++-- dbms/src/Databases/IDatabase.h | 4 +- dbms/src/Databases/test/gtest_database.cpp | 4 +- dbms/src/IO/IOThreadPools.h | 2 - .../src/Interpreters/InterpreterCreateQuery.h | 6 +- dbms/src/Interpreters/Settings.h | 1 + dbms/src/Interpreters/loadMetadata.cpp | 52 +++++++-- dbms/src/Server/BgStorageInit.cpp | 30 ++++- dbms/src/Server/Server.cpp | 6 +- 17 files changed, 261 insertions(+), 154 deletions(-) diff --git a/dbms/src/Common/UniThreadPool.h b/dbms/src/Common/UniThreadPool.h index 8f302dfcb05..a83b9aafeb7 100644 --- a/dbms/src/Common/UniThreadPool.h +++ b/dbms/src/Common/UniThreadPool.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -23,7 +24,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -31,6 +34,8 @@ namespace DB { +template +class ThreadPoolWaitGroup; /** Very simple thread pool similar to boost::threadpool. * Advantages: * - catches exceptions and rethrows on wait. @@ -93,6 +98,11 @@ class ThreadPoolImpl void setQueueSize(size_t value); size_t getMaxThreads() const; + std::unique_ptr> waitGroup() + { + return std::make_unique>(*this); + } + private: mutable std::mutex mutex; std::condition_variable job_finished; @@ -284,6 +294,75 @@ class ThreadFromGlobalPoolImpl : boost::noncopyable } }; +/// ThreadPoolWaitGroup is used to wait all the task launched here to finish +/// To guarantee the exception safty of ThreadPoolWaitGroup, we need to create object, do schedule and wait in the same scope. +template +class ThreadPoolWaitGroup +{ +public: + explicit ThreadPoolWaitGroup(ThreadPoolImpl & thread_pool_) + : thread_pool(thread_pool_) + {} + ThreadPoolWaitGroup(const ThreadPoolWaitGroup &) = delete; + ~ThreadPoolWaitGroup() + { + try + { + wait(); + } + catch (...) + { + tryLogCurrentException(Logger::get(), "Error in destructor function of ThreadPoolWaitGroup"); + } + } + + void schedule(std::function func) + { + auto task = std::make_shared>(func); + thread_pool.scheduleOrThrowOnError([task] { (*task)(); }); + futures.emplace_back(task->get_future()); + } + + void wait() + { + if (consumed) + return; + consumed = true; + + std::exception_ptr first_exception; + for (auto & future : futures) + { + // ensure all futures finished + try + { + future.get(); + } + catch (...) + { + if (!first_exception) + first_exception = std::current_exception(); + } + } + + if (first_exception) + { + try + { + std::rethrow_exception(first_exception); + } + catch (Exception & exc) + { + exc.addMessage(exc.getStackTrace().toString()); + exc.rethrow(); + } + } + } + +private: + std::vector> futures; + ThreadPoolImpl & thread_pool; + bool consumed = false; +}; /// Schedule jobs/tasks on global thread pool without implicit passing tracing context on current thread to underlying worker as parent tracing context. /// /// If you implement your own job/task scheduling upon global thread pool or schedules a long time running job in a infinite loop way, diff --git a/dbms/src/Databases/DatabaseMemory.cpp b/dbms/src/Databases/DatabaseMemory.cpp index 5f275f6abd8..7a17484681d 100644 --- a/dbms/src/Databases/DatabaseMemory.cpp +++ b/dbms/src/Databases/DatabaseMemory.cpp @@ -31,7 +31,7 @@ DatabaseMemory::DatabaseMemory(String name_) void DatabaseMemory::loadTables( Context & /*context*/, - legacy::ThreadPool * /*thread_pool*/, + ThreadPool * /*thread_pool*/, bool /*has_force_restore_data_flag*/) { /// Nothing to load. diff --git a/dbms/src/Databases/DatabaseMemory.h b/dbms/src/Databases/DatabaseMemory.h index d8a458267f9..09e0e31ca11 100644 --- a/dbms/src/Databases/DatabaseMemory.h +++ b/dbms/src/Databases/DatabaseMemory.h @@ -40,7 +40,7 @@ class DatabaseMemory : public DatabaseWithOwnTablesBase void loadTables( Context & context, - legacy::ThreadPool * thread_pool, + ThreadPool * thread_pool, bool has_force_restore_data_flag) override; void createTable( diff --git a/dbms/src/Databases/DatabaseOrdinary.cpp b/dbms/src/Databases/DatabaseOrdinary.cpp index 7a06e186889..72e158b6bb6 100644 --- a/dbms/src/Databases/DatabaseOrdinary.cpp +++ b/dbms/src/Databases/DatabaseOrdinary.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -29,11 +30,9 @@ #include #include #include -#include #include #include - namespace DB { namespace ErrorCodes @@ -46,6 +45,7 @@ extern const int FILE_DOESNT_EXIST; extern const int LOGICAL_ERROR; extern const int CANNOT_GET_CREATE_TABLE_QUERY; extern const int SYNTAX_ERROR; +extern const int TIDB_TABLE_ALREADY_EXISTS; } // namespace ErrorCodes namespace FailPoints @@ -83,7 +83,7 @@ DatabaseOrdinary::DatabaseOrdinary(String name_, const String & metadata_path_, } -void DatabaseOrdinary::loadTables(Context & context, legacy::ThreadPool * thread_pool, bool has_force_restore_data_flag) +void DatabaseOrdinary::loadTables(Context & context, ThreadPool * thread_pool, bool has_force_restore_data_flag) { using FileNames = std::vector; FileNames file_names = DatabaseLoading::listSQLFilenames(metadata_path, log); @@ -102,10 +102,15 @@ void DatabaseOrdinary::loadTables(Context & context, legacy::ThreadPool * thread AtomicStopwatch watch; std::atomic tables_processed{0}; + auto wait_group = thread_pool ? thread_pool->waitGroup() : nullptr; + + std::mutex failed_tables_mutex; + Tables tables_failed_to_startup; + auto task_function = [&](FileNames::const_iterator begin, FileNames::const_iterator end) { for (auto it = begin; it != end; ++it) { - const String & table = *it; + const String & table_file = *it; /// Messages, so that it's not boring to wait for the server to load for a long time. if ((++tables_processed) % PRINT_MESSAGE_EACH_N_TABLES == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS)) @@ -114,7 +119,32 @@ void DatabaseOrdinary::loadTables(Context & context, legacy::ThreadPool * thread watch.restart(); } - DatabaseLoading::loadTable(context, *this, metadata_path, name, data_path, getEngineName(), table, has_force_restore_data_flag); + auto [table_name, table] = DatabaseLoading::loadTable(context, *this, metadata_path, name, data_path, getEngineName(), table_file, has_force_restore_data_flag); + + /// After table was basically initialized, startup it. + if (table) + { + try + { + table->startup(); + } + catch (DB::Exception & e) + { + if (e.code() == ErrorCodes::TIDB_TABLE_ALREADY_EXISTS) + { + // While doing IStorage::startup, Exception thorwn with TIDB_TABLE_ALREADY_EXISTS, + // means that we may crashed in the middle of renaming tables. We clean the meta file + // for those storages by `cleanupTables`. + // - If the storage is the outdated one after renaming, remove it is right. + // - If the storage should be the target table, remove it means we "rollback" the + // rename action. And the table will be renamed by TiDBSchemaSyncer later. + std::lock_guard lock(failed_tables_mutex); + tables_failed_to_startup.emplace(table_name, table); + } + else + throw; + } + } } }; @@ -126,21 +156,20 @@ void DatabaseOrdinary::loadTables(Context & context, legacy::ThreadPool * thread auto begin = file_names.begin() + i * bunch_size; auto end = (i + 1 == num_bunches) ? file_names.end() : (file_names.begin() + (i + 1) * bunch_size); - auto task = [task_function, begin, end] { - return task_function(begin, end); + auto task = [&task_function, begin, end] { + task_function(begin, end); }; if (thread_pool) - thread_pool->schedule(task); + wait_group->schedule(task); else task(); } if (thread_pool) - thread_pool->wait(); + wait_group->wait(); - /// After all tables was basically initialized, startup them. - DatabaseLoading::startupTables(*this, name, tables, thread_pool, log); + DatabaseLoading::cleanupTables(*this, name, tables_failed_to_startup, log); } void DatabaseOrdinary::createTable(const Context & context, const String & table_name, const StoragePtr & table, const ASTPtr & query) diff --git a/dbms/src/Databases/DatabaseOrdinary.h b/dbms/src/Databases/DatabaseOrdinary.h index d0f80f89f91..50209729235 100644 --- a/dbms/src/Databases/DatabaseOrdinary.h +++ b/dbms/src/Databases/DatabaseOrdinary.h @@ -32,7 +32,7 @@ class DatabaseOrdinary : public DatabaseWithOwnTablesBase void loadTables( Context & context, - legacy::ThreadPool * thread_pool, + ThreadPool * thread_pool, bool has_force_restore_data_flag) override; void createTable( diff --git a/dbms/src/Databases/DatabaseTiFlash.cpp b/dbms/src/Databases/DatabaseTiFlash.cpp index e588f077101..cce9a7d7781 100644 --- a/dbms/src/Databases/DatabaseTiFlash.cpp +++ b/dbms/src/Databases/DatabaseTiFlash.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -31,7 +32,6 @@ #include #include #include -#include #include namespace DB @@ -46,6 +46,7 @@ extern const int FILE_DOESNT_EXIST; extern const int LOGICAL_ERROR; extern const int CANNOT_GET_CREATE_TABLE_QUERY; extern const int SYNTAX_ERROR; +extern const int TIDB_TABLE_ALREADY_EXISTS; } // namespace ErrorCodes namespace FailPoints @@ -105,7 +106,7 @@ static constexpr size_t PRINT_MESSAGE_EACH_N_TABLES = 256; static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5; static constexpr size_t TABLES_PARALLEL_LOAD_BUNCH_SIZE = 100; -void DatabaseTiFlash::loadTables(Context & context, legacy::ThreadPool * thread_pool, bool has_force_restore_data_flag) +void DatabaseTiFlash::loadTables(Context & context, ThreadPool * thread_pool, bool has_force_restore_data_flag) { using FileNames = std::vector; FileNames table_files = DatabaseLoading::listSQLFilenames(getMetadataPath(), log); @@ -122,6 +123,11 @@ void DatabaseTiFlash::loadTables(Context & context, legacy::ThreadPool * thread_ AtomicStopwatch watch; std::atomic tables_processed{0}; + auto wait_group = thread_pool ? thread_pool->waitGroup() : nullptr; + + std::mutex failed_tables_mutex; + Tables tables_failed_to_startup; + auto task_function = [&](std::vector::const_iterator begin, std::vector::const_iterator end) { for (auto it = begin; it != end; ++it) { @@ -133,7 +139,7 @@ void DatabaseTiFlash::loadTables(Context & context, legacy::ThreadPool * thread_ } const String & table_file = *it; - DatabaseLoading::loadTable( + auto [table_name, table] = DatabaseLoading::loadTable( context, *this, metadata_path, @@ -142,6 +148,31 @@ void DatabaseTiFlash::loadTables(Context & context, legacy::ThreadPool * thread_ getEngineName(), table_file, has_force_restore_data_flag); + + /// After table was basically initialized, startup it. + if (table) + { + try + { + table->startup(); + } + catch (DB::Exception & e) + { + if (e.code() == ErrorCodes::TIDB_TABLE_ALREADY_EXISTS) + { + // While doing IStorage::startup, Exception thorwn with TIDB_TABLE_ALREADY_EXISTS, + // means that we may crashed in the middle of renaming tables. We clean the meta file + // for those storages by `cleanupTables`. + // - If the storage is the outdated one after renaming, remove it is right. + // - If the storage should be the target table, remove it means we "rollback" the + // rename action. And the table will be renamed by TiDBSchemaSyncer later. + std::lock_guard lock(failed_tables_mutex); + tables_failed_to_startup.emplace(table_name, table); + } + else + throw; + } + } } }; @@ -155,17 +186,17 @@ void DatabaseTiFlash::loadTables(Context & context, legacy::ThreadPool * thread_ auto task = [&task_function, begin, end] { task_function(begin, end); }; + if (thread_pool) - thread_pool->schedule(task); + wait_group->schedule(task); else task(); } if (thread_pool) - thread_pool->wait(); + wait_group->wait(); - // After all tables was basically initialized, startup them. - DatabaseLoading::startupTables(*this, name, tables, thread_pool, log); + DatabaseLoading::cleanupTables(*this, name, tables_failed_to_startup, log); } diff --git a/dbms/src/Databases/DatabaseTiFlash.h b/dbms/src/Databases/DatabaseTiFlash.h index 5e45f571c13..4c0f7ca3443 100644 --- a/dbms/src/Databases/DatabaseTiFlash.h +++ b/dbms/src/Databases/DatabaseTiFlash.h @@ -37,7 +37,7 @@ class DatabaseTiFlash : public DatabaseWithOwnTablesBase String getEngineName() const override { return "TiFlash"; } - void loadTables(Context & context, legacy::ThreadPool * thread_pool, bool has_force_restore_data_flag) override; + void loadTables(Context & context, ThreadPool * thread_pool, bool has_force_restore_data_flag) override; void createTable(const Context & context, const String & table_name, const StoragePtr & table, const ASTPtr & query) override; diff --git a/dbms/src/Databases/DatabasesCommon.cpp b/dbms/src/Databases/DatabasesCommon.cpp index 1e2018e88e0..93ec7fb390a 100644 --- a/dbms/src/Databases/DatabasesCommon.cpp +++ b/dbms/src/Databases/DatabasesCommon.cpp @@ -24,13 +24,11 @@ #include #include #include -#include #include #include #include - namespace DB { namespace ErrorCodes @@ -42,7 +40,6 @@ extern const int LOGICAL_ERROR; extern const int INCORRECT_FILE_NAME; extern const int CANNOT_CREATE_TABLE_FROM_METADATA; extern const int SYNTAX_ERROR; -extern const int TIDB_TABLE_ALREADY_EXISTS; } // namespace ErrorCodes @@ -282,14 +279,14 @@ std::vector listSQLFilenames(const String & meta_dir, Poco::Logger * log return filenames; } -void loadTable(Context & context, - IDatabase & database, - const String & database_metadata_path, - const String & database_name, - const String & database_data_path, - const String & database_engine, - const String & file_name, - bool has_force_restore_data_flag) +std::tuple loadTable(Context & context, + IDatabase & database, + const String & database_metadata_path, + const String & database_name, + const String & database_data_path, + const String & database_engine, + const String & file_name, + bool has_force_restore_data_flag) { Poco::Logger * log = &Poco::Logger::get("loadTable"); const String table_metadata_path = database_metadata_path + (endsWith(database_metadata_path, "/") ? "" : "/") + file_name; @@ -307,7 +304,7 @@ void loadTable(Context & context, { LOG_ERROR(log, "File {} is empty. Removing.", table_metadata_path); Poco::File(table_metadata_path).remove(); - return; + return std::make_tuple("", nullptr); } try @@ -326,6 +323,7 @@ void loadTable(Context & context, std::tie(table_name, table) = createTableFromDefinition(statement, database_name, database_data_path, database_engine, context, has_force_restore_data_flag, "in file " + table_metadata_path); } database.attachTable(table_name, table); + return std::make_tuple(table_name, table); } catch (const Exception & e) { @@ -336,12 +334,9 @@ void loadTable(Context & context, e.getStackTrace().toString()), ErrorCodes::CANNOT_CREATE_TABLE_FROM_METADATA); } -} - -static constexpr size_t PRINT_MESSAGE_EACH_N_TABLES = 256; -static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5; -static constexpr size_t TABLES_PARALLEL_LOAD_BUNCH_SIZE = 100; + return std::make_tuple("", nullptr); +} void cleanupTables(IDatabase & database, const String & db_name, const Tables & tables, Poco::Logger * log) { @@ -361,82 +356,6 @@ void cleanupTables(IDatabase & database, const String & db_name, const Tables & database.detachTable(table_name); } } - -void startupTables(IDatabase & database, const String & db_name, Tables & tables, legacy::ThreadPool * thread_pool, Poco::Logger * log) -{ - LOG_INFO(log, "Starting up {} tables.", tables.size()); - - AtomicStopwatch watch; - std::atomic tables_processed{0}; - size_t total_tables = tables.size(); - - std::mutex failed_tables_mutex; - Tables tables_failed_to_startup; - - auto task_function = [&](Tables::iterator begin, Tables::iterator end) { - for (auto it = begin; it != end; ++it) - { - if ((++tables_processed) % PRINT_MESSAGE_EACH_N_TABLES == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS)) - { - LOG_INFO(log, "{:.2f}%", tables_processed * 100.0 / total_tables); - watch.restart(); - } - - try - { - it->second->startup(); - } - catch (DB::Exception & e) - { - if (e.code() == ErrorCodes::TIDB_TABLE_ALREADY_EXISTS) - { - // While doing IStorage::startup, Exception thorwn with TIDB_TABLE_ALREADY_EXISTS, - // means that we may crashed in the middle of renaming tables. We clean the meta file - // for those storages by `cleanupTables`. - // - If the storage is the outdated one after renaming, remove it is right. - // - If the storage should be the target table, remove it means we "rollback" the - // rename action. And the table will be renamed by TiDBSchemaSyncer later. - std::lock_guard lock(failed_tables_mutex); - tables_failed_to_startup.emplace(it->first, it->second); - } - else - throw; - } - } - }; - - const size_t bunch_size = TABLES_PARALLEL_LOAD_BUNCH_SIZE; - size_t num_bunches = (total_tables + bunch_size - 1) / bunch_size; - - auto begin = tables.begin(); - for (size_t i = 0; i < num_bunches; ++i) - { - auto end = begin; - - if (i + 1 == num_bunches) - end = tables.end(); - else - std::advance(end, bunch_size); - - auto task = [&task_function, begin, end] { - task_function(begin, end); - }; - - if (thread_pool) - thread_pool->schedule(task); - else - task(); - - begin = end; - } - - if (thread_pool) - thread_pool->wait(); - - // Cleanup to asure the atomic of renaming - cleanupTables(database, db_name, tables_failed_to_startup, log); -} - } // namespace DatabaseLoading } // namespace DB diff --git a/dbms/src/Databases/DatabasesCommon.h b/dbms/src/Databases/DatabasesCommon.h index eb356142679..c5a01df382f 100644 --- a/dbms/src/Databases/DatabasesCommon.h +++ b/dbms/src/Databases/DatabasesCommon.h @@ -63,18 +63,16 @@ ASTPtr getCreateQueryFromMetadata(const Context & context, const String & metada std::vector listSQLFilenames(const String & meta_dir, Poco::Logger * log); -// Startup tables with thread_pool. If exception with code TIDB_TABLE_ALREADY_EXISTS thrown in startup, -// those tables' meta will be removed and deatch from database. -void startupTables(IDatabase & database, const String & db_name, Tables & tables, legacy::ThreadPool * thread_pool, Poco::Logger * log); - -void loadTable(Context & context, - IDatabase & database, - const String & database_metadata_path, - const String & database_name, - const String & database_data_path, - const String & database_engine, - const String & file_name, - bool has_force_restore_data_flag); +void cleanupTables(IDatabase & database, const String & db_name, const Tables & tables, Poco::Logger * log); + +std::tuple loadTable(Context & context, + IDatabase & database, + const String & database_metadata_path, + const String & database_name, + const String & database_data_path, + const String & database_engine, + const String & file_name, + bool has_force_restore_data_flag); } // namespace DatabaseLoading diff --git a/dbms/src/Databases/IDatabase.h b/dbms/src/Databases/IDatabase.h index 801eb9d107a..c42867e885d 100644 --- a/dbms/src/Databases/IDatabase.h +++ b/dbms/src/Databases/IDatabase.h @@ -14,11 +14,11 @@ #pragma once +#include #include #include #include #include -#include #include #include @@ -72,7 +72,7 @@ class IDatabase : public std::enable_shared_from_this /// Load a set of existing tables. If thread_pool is specified, use it. /// You can call only once, right after the object is created. - virtual void loadTables(Context & context, legacy::ThreadPool * thread_pool, bool has_force_restore_data_flag) = 0; + virtual void loadTables(Context & context, ThreadPool * thread_pool, bool has_force_restore_data_flag) = 0; /// Check the existence of the table. virtual bool isTableExist(const Context & context, const String & name) const = 0; diff --git a/dbms/src/Databases/test/gtest_database.cpp b/dbms/src/Databases/test/gtest_database.cpp index c967397e207..4387eee79e4 100644 --- a/dbms/src/Databases/test/gtest_database.cpp +++ b/dbms/src/Databases/test/gtest_database.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -31,7 +32,6 @@ #include #include #include -#include #include #include @@ -515,7 +515,7 @@ try { // If we loadTable for db2, new table meta should be removed. - legacy::ThreadPool thread_pool(2); + ThreadPool thread_pool(2); db2->loadTables(*ctx, &thread_pool, true); Poco::File new_meta_file(db2->getTableMetadataPath(tbl_name)); diff --git a/dbms/src/IO/IOThreadPools.h b/dbms/src/IO/IOThreadPools.h index 5b68a444bfc..07b9a088ab4 100644 --- a/dbms/src/IO/IOThreadPools.h +++ b/dbms/src/IO/IOThreadPools.h @@ -45,6 +45,4 @@ using DataStoreS3Pool = IOThreadPool; using S3FileCachePool = IOThreadPool; using RNRemoteReadTaskPool = IOThreadPool; using RNPagePreparerPool = IOThreadPool; - - } // namespace DB diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.h b/dbms/src/Interpreters/InterpreterCreateQuery.h index 05dc951a85b..0ca8ea01c2e 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.h +++ b/dbms/src/Interpreters/InterpreterCreateQuery.h @@ -14,9 +14,9 @@ #pragma once +#include #include #include -#include namespace DB @@ -42,7 +42,7 @@ class InterpreterCreateQuery : public IInterpreter static ASTPtr formatColumns(const NamesAndTypesList & columns); static ASTPtr formatColumns(const ColumnsDescription & columns); - void setDatabaseLoadingThreadpool(legacy::ThreadPool & thread_pool_) + void setDatabaseLoadingThreadpool(ThreadPool & thread_pool_) { thread_pool = &thread_pool_; } @@ -73,7 +73,7 @@ class InterpreterCreateQuery : public IInterpreter Context & context; /// Using while loading database. - legacy::ThreadPool * thread_pool = nullptr; + ThreadPool * thread_pool = nullptr; /// Skip safety threshold when loading tables. bool has_force_restore_data_flag = false; diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 2172aed73bb..7e896d7b097 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -238,6 +238,7 @@ struct Settings M(SettingUInt64, dt_small_file_size_threshold, 128 * 1024, "When S3 is enabled, file size less than dt_small_file_size_threshold will be merged before uploading to S3") \ M(SettingDouble, dt_merged_file_max_size, 1024 * 1024, "Small files are merged into one or more files not larger than dt_merged_file_max_size") \ M(SettingDouble, io_thread_count_scale, 5.0, "Number of thread of IOThreadPool = number of logical cpu cores * io_thread_count_scale. Only has meaning at server startup.") \ + M(SettingUInt64, init_thread_count_scale, 100, "Number of thread = number of logical cpu cores * init_thread_count_scale. It just works for thread pool for initStores and loadMetadata") \ \ M(SettingChecksumAlgorithm, dt_checksum_algorithm, ChecksumAlgo::XXH3, "Checksum algorithm for delta tree stable storage") \ M(SettingCompressionMethod, dt_compression_method, CompressionMethod::LZ4, "The method of data compression when writing.") \ diff --git a/dbms/src/Interpreters/loadMetadata.cpp b/dbms/src/Interpreters/loadMetadata.cpp index 0f2fe10e4c4..3010cb70318 100644 --- a/dbms/src/Interpreters/loadMetadata.cpp +++ b/dbms/src/Interpreters/loadMetadata.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -42,7 +43,7 @@ static void executeCreateQuery(const String & query, Context & context, const String & database, const String & file_name, - legacy::ThreadPool * pool, + ThreadPool * pool, bool has_force_restore_data_flag) { ParserCreateQuery parser; @@ -60,19 +61,17 @@ static void executeCreateQuery(const String & query, interpreter.execute(); } - #define SYSTEM_DATABASE "system" static void loadDatabase( Context & context, const String & database, const String & database_metadata_file, - legacy::ThreadPool * thread_pool, + ThreadPool * thread_pool, bool force_restore_data) { /// There may exist .sql file with database creation statement. /// Or, if it is absent, then database with default engine is created. - String database_attach_query; if (Poco::File(database_metadata_file).exists()) @@ -89,7 +88,6 @@ static void loadDatabase( executeCreateQuery(database_attach_query, context, database, database_metadata_file, thread_pool, force_restore_data); } - void loadMetadata(Context & context) { const String path = context.getPath() + "metadata/"; @@ -102,8 +100,6 @@ void loadMetadata(Context & context) Poco::File force_restore_data_flag_file(context.getFlagsPath() + "force_restore_data"); bool has_force_restore_data_flag = force_restore_data_flag_file.exists(); - /// For parallel tables loading. - legacy::ThreadPool thread_pool(SettingMaxThreads().getAutoValue()); Poco::Logger * log = &Poco::Logger::get("loadMetadata"); /// Loop over databases sql files. This ensure filename ends with ".sql". @@ -138,10 +134,46 @@ void loadMetadata(Context & context) } } - for (const auto & [db_name, meta_file] : databases) - loadDatabase(context, db_name, meta_file, &thread_pool, has_force_restore_data_flag); - thread_pool.wait(); + auto load_database = [&](Context & context, const String & database, const String & database_metadata_file, ThreadPool * thread_pool, bool force_restore_data) { + /// There may exist .sql file with database creation statement. + /// Or, if it is absent, then database with default engine is created. + String database_attach_query; + if (Poco::File(database_metadata_file).exists()) + { + ReadBufferFromFileProvider in(context.getFileProvider(), database_metadata_file, EncryptionPath(database_metadata_file, ""), 1024); + readStringUntilEOF(database_attach_query, in); + } + else + { + // Old fashioned way, keep engine as "Ordinary" + database_attach_query = "ATTACH DATABASE " + backQuoteIfNeed(database) + " ENGINE=Ordinary"; + } + + executeCreateQuery(database_attach_query, context, database, database_metadata_file, thread_pool, force_restore_data); + }; + + size_t default_num_threads = std::max(4UL, std::thread::hardware_concurrency()) * context.getSettingsRef().init_thread_count_scale; + auto load_database_thread_num = std::min(default_num_threads, databases.size()); + + auto load_databases_thread_pool = ThreadPool(load_database_thread_num, load_database_thread_num / 2, load_database_thread_num * 2); + auto load_databases_wait_group = load_databases_thread_pool.waitGroup(); + + auto load_tables_thread_pool = ThreadPool(default_num_threads, default_num_threads / 2, default_num_threads * 2); + + for (const auto & database : databases) + { + const auto & db_name = database.first; + const auto & meta_file = database.second; + + auto task = [&load_database, &context, &db_name, &meta_file, has_force_restore_data_flag, &load_tables_thread_pool] { + load_database(context, db_name, meta_file, &load_tables_thread_pool, has_force_restore_data_flag); + }; + + load_databases_wait_group->schedule(task); + } + + load_databases_wait_group->wait(); if (has_force_restore_data_flag) force_restore_data_flag_file.remove(); diff --git a/dbms/src/Server/BgStorageInit.cpp b/dbms/src/Server/BgStorageInit.cpp index c53953cf25a..ca20a0b44e8 100644 --- a/dbms/src/Server/BgStorageInit.cpp +++ b/dbms/src/Server/BgStorageInit.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -44,12 +45,14 @@ void BgStorageInitHolder::start(Context & global_context, const LoggerPtr & log, "When S3 enabled, lazily_init_store must be true. lazily_init_store={} s3_enabled={}", lazily_init_store, is_s3_enabled); + auto do_init_stores = [&global_context, &log] { auto storages = global_context.getTMTContext().getStorages().getAllStorage(); - int init_cnt = 0; - int err_cnt = 0; - for (auto & [ks_table_id, storage] : storages) - { + + std::atomic init_cnt = 0; + std::atomic err_cnt = 0; + + auto init_stores_function = [&](const auto & ks_table_id, auto & storage) { // This will skip the init of storages that do not contain any data. TiFlash now sync the schema and // create all tables regardless the table have define TiFlash replica or not, so there may be lots // of empty tables in TiFlash. @@ -66,7 +69,26 @@ void BgStorageInitHolder::start(Context & global_context, const LoggerPtr & log, err_cnt++; tryLogCurrentException(log, fmt::format("Storage inited fail, keyspace_id={} table_id={}", ks_id, table_id)); } + }; + + size_t default_num_threads = std::max(4UL, std::thread::hardware_concurrency()) * global_context.getSettingsRef().init_thread_count_scale; + auto init_storages_thread_pool = ThreadPool(default_num_threads, default_num_threads / 2, default_num_threads * 2); + auto init_storages_wait_group = init_storages_thread_pool.waitGroup(); + + for (auto & iter : storages) + { + const auto & ks_table_id = iter.first; + auto & storage = iter.second; + + auto task = [&init_stores_function, &ks_table_id, &storage] { + init_stores_function(ks_table_id, storage); + }; + + init_storages_wait_group->schedule(task); } + + init_storages_wait_group->wait(); + LOG_INFO( log, "Storage inited finish. [total_count={}] [init_count={}] [error_count={}] [datatype_fullname_count={}]", diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 4705245a166..5920effa5b3 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -521,7 +521,6 @@ struct RaftStoreProxyRunner : boost::noncopyable const LoggerPtr & log; }; - class Server::TcpHttpServersHolder { public: @@ -838,11 +837,10 @@ void adjustThreadPoolSize(const Settings & settings, size_t logical_cores) { // TODO: make BackgroundPool/BlockableBackgroundPool/DynamicThreadPool spawned from `GlobalThreadPool` size_t max_io_thread_count = std::ceil(settings.io_thread_count_scale * logical_cores); - // Note: Global Thread Pool must be larger than sub thread pools. - GlobalThreadPool::instance().setMaxThreads(max_io_thread_count * 20); + GlobalThreadPool::instance().setMaxThreads(max_io_thread_count * 200); GlobalThreadPool::instance().setMaxFreeThreads(max_io_thread_count); - GlobalThreadPool::instance().setQueueSize(max_io_thread_count * 8); + GlobalThreadPool::instance().setQueueSize(max_io_thread_count * 400); if (RNPagePreparerPool::instance) { From 5e646a9c73f2fc8220d111e1ff571f7051eac902 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 6 Apr 2023 14:34:58 +0800 Subject: [PATCH 4/4] Pipeline: support io task thread pool (#7195) ref pingcap/tiflash#6518, close pingcap/tiflash#7222 --- .../Flash/Coprocessor/InterpreterUtils.cpp | 2 +- dbms/src/Flash/Executor/PipelineExecutor.h | 29 ++++--- dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp | 83 ++++++++++++++----- dbms/src/Flash/Pipeline/Exec/PipelineExec.h | 7 ++ .../Schedule/Events/tests/gtest_event.cpp | 2 +- .../Flash/Pipeline/Schedule/TaskScheduler.cpp | 42 +++++++++- .../Flash/Pipeline/Schedule/TaskScheduler.h | 47 +++++++---- .../Pipeline/Schedule/TaskThreadPool.cpp | 40 ++++++--- .../Flash/Pipeline/Schedule/TaskThreadPool.h | 5 +- .../Pipeline/Schedule/TaskThreadPoolImpl.h | 59 +++++++++++++ .../Pipeline/Schedule/Tasks/EventTask.cpp | 5 ++ .../Flash/Pipeline/Schedule/Tasks/EventTask.h | 3 + .../Pipeline/Schedule/Tasks/PipelineTask.cpp | 23 +++++ .../Pipeline/Schedule/Tasks/PipelineTask.h | 2 + dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h | 17 +++- .../Flash/Pipeline/Schedule/WaitReactor.cpp | 20 +++-- .../Schedule/tests/gtest_task_scheduler.cpp | 68 ++++++++++++++- dbms/src/Interpreters/Settings.h | 3 +- dbms/src/Operators/Operator.cpp | 12 +++ dbms/src/Operators/Operator.h | 11 ++- dbms/src/Operators/OperatorHelper.cpp | 3 +- dbms/src/Server/Server.cpp | 5 +- dbms/src/TestUtils/ExecutorTestUtils.cpp | 2 +- 23 files changed, 405 insertions(+), 85 deletions(-) create mode 100644 dbms/src/Flash/Pipeline/Schedule/TaskThreadPoolImpl.h diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index 489d4f5dc18..24592be3749 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -180,8 +180,8 @@ void executeLocalSort( // For order by const col and has limit, we will generate LimitOperator directly. if (limit) { - auto local_limit = std::make_shared(input_header, *limit); group_builder.transform([&](auto & builder) { + auto local_limit = std::make_shared(input_header, *limit); builder.appendTransformOp(std::make_unique>(exec_status, log->identifier(), local_limit)); }); } diff --git a/dbms/src/Flash/Executor/PipelineExecutor.h b/dbms/src/Flash/Executor/PipelineExecutor.h index cbc4f46f295..bc852fef9cd 100644 --- a/dbms/src/Flash/Executor/PipelineExecutor.h +++ b/dbms/src/Flash/Executor/PipelineExecutor.h @@ -29,19 +29,22 @@ using Pipelines = std::vector; /** * PipelineExecutor is the implementation of the pipeline-based execution model. * - * ┌─────────────────────┐ - * │ task scheduler │ - * generate submit tasks │ │ - * pipeline ────────► event1 ─────────────► │ │ - * │ ┌────────────────┐ │ - * │ trigger │ │task thread pool│ │ - * ▼ │ └──────▲──┬──────┘ │ - * submit tasks │ │ │ │ - * event2 ─────────────► │ ┌────┴──▼────┐ │ - * │ │wait reactor│ │ - * │ └────────────┘ │ - * │ │ - * └─────────────────────┘ + * ┌────────────────────────────┐ + * │ task scheduler │ + * generate submit tasks │ │ + * pipeline ────────► event1 ─────────────► │ ┌───────────────────┐ │ + * │ ┌──┤io task thread pool◄─┐ │ + * │ trigger │ │ └──────▲──┬─────────┘ │ │ + * ▼ │ │ │ │ │ │ + * submit tasks │ │ ┌───────┴──▼─────────┐ │ │ + * event2 ─────────────► │ │ │cpu task thread pool│ │ │ + * │ │ └───────▲──┬─────────┘ │ │ + * │ trigger │ │ │ │ │ │ + * ▼ │ │ ┌────┴──▼────┐ │ │ + * submit tasks │ └────►wait reactor├──────┘ │ + * event3 ─────────────► │ └────────────┘ │ + * │ │ + * └────────────────────────────┘ * * As shown above, the pipeline generates a number of events, which are executed in dependency order, * and the events generate a number of tasks that will be submitted to the TaskScheduler for execution. diff --git a/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp b/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp index 22e408eea74..a1501dc3883 100644 --- a/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp +++ b/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp @@ -17,10 +17,39 @@ namespace DB { +#define HANDLE_OP_STATUS(op, op_status, expect_status) \ + switch (op_status) \ + { \ + case (expect_status): \ + break; \ + /* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \ + case OperatorStatus::IO: \ + assert(!io_op); \ + assert(op); \ + io_op.emplace((op).get()); \ + /* For unexpected status, an immediate return is required. */ \ + default: \ + return (op_status); \ + } + +#define HANDLE_LAST_OP_STATUS(op, op_status) \ + assert(op); \ + switch (op_status) \ + { \ + /* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \ + case OperatorStatus::IO: \ + assert(!io_op); \ + assert(op); \ + io_op.emplace((op).get()); \ + /* For the last operator, the status will always be returned. */ \ + default: \ + return (op_status); \ + } + void PipelineExec::executePrefix() { sink_op->operatePrefix(); - for (auto it = transform_ops.rbegin(); it != transform_ops.rend(); ++it) + for (auto it = transform_ops.rbegin(); it != transform_ops.rend(); ++it) // NOLINT(modernize-loop-convert) (*it)->operatePrefix(); source_op->operatePrefix(); } @@ -28,7 +57,7 @@ void PipelineExec::executePrefix() void PipelineExec::executeSuffix() { sink_op->operateSuffix(); - for (auto it = transform_ops.rbegin(); it != transform_ops.rend(); ++it) + for (auto it = transform_ops.rbegin(); it != transform_ops.rend(); ++it) // NOLINT(modernize-loop-convert) (*it)->operateSuffix(); source_op->operateSuffix(); } @@ -63,11 +92,10 @@ OperatorStatus PipelineExec::executeImpl() { const auto & transform_op = transform_ops[transform_op_index]; op_status = transform_op->transform(block); - if (op_status != OperatorStatus::HAS_OUTPUT) - return op_status; + HANDLE_OP_STATUS(transform_op, op_status, OperatorStatus::HAS_OUTPUT); } op_status = sink_op->write(std::move(block)); - return op_status; + HANDLE_LAST_OP_STATUS(sink_op, op_status); } // try fetch block from transform_ops and source_op. @@ -76,21 +104,36 @@ OperatorStatus PipelineExec::fetchBlock( size_t & start_transform_op_index) { auto op_status = sink_op->prepare(); - if (op_status != OperatorStatus::NEED_INPUT) - return op_status; + HANDLE_OP_STATUS(sink_op, op_status, OperatorStatus::NEED_INPUT); for (int64_t index = transform_ops.size() - 1; index >= 0; --index) { const auto & transform_op = transform_ops[index]; op_status = transform_op->tryOutput(block); - if (op_status != OperatorStatus::NEED_INPUT) - { - // Once the transform op tryOutput has succeeded, execution will begin with the next transform op. - start_transform_op_index = index + 1; - return op_status; - } + // Once the transform op tryOutput has succeeded, execution will begin with the next transform op. + start_transform_op_index = index + 1; + HANDLE_OP_STATUS(transform_op, op_status, OperatorStatus::NEED_INPUT); } start_transform_op_index = 0; op_status = source_op->read(block); + HANDLE_LAST_OP_STATUS(source_op, op_status); +} + +OperatorStatus PipelineExec::executeIO() +{ + auto op_status = executeIOImpl(); +#ifndef NDEBUG + // `NEED_INPUT` means that pipeline_exec need data to do the calculations and expect the next call to `execute`. + // `HAS_OUTPUT` means that pipeline_exec has data to do the calculations and expect the next call to `execute`. + assertOperatorStatus(op_status, {OperatorStatus::FINISHED, OperatorStatus::HAS_OUTPUT, OperatorStatus::NEED_INPUT}); +#endif + return op_status; +} +OperatorStatus PipelineExec::executeIOImpl() +{ + assert(io_op && *io_op); + auto op_status = (*io_op)->executeIO(); + if (op_status != OperatorStatus::IO) + io_op.reset(); return op_status; } @@ -106,17 +149,19 @@ OperatorStatus PipelineExec::await() OperatorStatus PipelineExec::awaitImpl() { auto op_status = sink_op->await(); - if (op_status != OperatorStatus::NEED_INPUT) - return op_status; - for (auto it = transform_ops.rbegin(); it != transform_ops.rend(); ++it) + HANDLE_OP_STATUS(sink_op, op_status, OperatorStatus::NEED_INPUT); + for (auto it = transform_ops.rbegin(); it != transform_ops.rend(); ++it) // NOLINT(modernize-loop-convert) { // If the transform_op returns `NEED_INPUT`, // we need to call the upstream transform_op until a transform_op returns something other than `NEED_INPUT`. op_status = (*it)->await(); - if (op_status != OperatorStatus::NEED_INPUT) - return op_status; + HANDLE_OP_STATUS((*it), op_status, OperatorStatus::NEED_INPUT); } op_status = source_op->await(); - return op_status; + HANDLE_LAST_OP_STATUS(source_op, op_status); } + +#undef HANDLE_OP_STATUS +#undef HANDLE_LAST_OP_STATUS + } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Exec/PipelineExec.h b/dbms/src/Flash/Pipeline/Exec/PipelineExec.h index 7fb845b3df1..86fdbe75610 100644 --- a/dbms/src/Flash/Pipeline/Exec/PipelineExec.h +++ b/dbms/src/Flash/Pipeline/Exec/PipelineExec.h @@ -40,11 +40,15 @@ class PipelineExec : private boost::noncopyable OperatorStatus execute(); + OperatorStatus executeIO(); + OperatorStatus await(); private: OperatorStatus executeImpl(); + OperatorStatus executeIOImpl(); + OperatorStatus awaitImpl(); OperatorStatus fetchBlock( @@ -55,6 +59,9 @@ class PipelineExec : private boost::noncopyable SourceOpPtr source_op; TransformOps transform_ops; SinkOpPtr sink_op; + + // hold the operator which is ready for executing io. + std::optional io_op; }; using PipelineExecPtr = std::unique_ptr; // a set of pipeline_execs running in parallel. diff --git a/dbms/src/Flash/Pipeline/Schedule/Events/tests/gtest_event.cpp b/dbms/src/Flash/Pipeline/Schedule/Events/tests/gtest_event.cpp index e830a7c04e0..ecff5988d8e 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Events/tests/gtest_event.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Events/tests/gtest_event.cpp @@ -320,7 +320,7 @@ class EventTestRunner : public ::testing::Test void SetUp() override { - TaskSchedulerConfig config{thread_num}; + TaskSchedulerConfig config{thread_num, thread_num}; assert(!TaskScheduler::instance); TaskScheduler::instance = std::make_unique(config); } diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.cpp b/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.cpp index d412b91c730..771484e0266 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.cpp @@ -23,17 +23,20 @@ namespace DB { TaskScheduler::TaskScheduler(const TaskSchedulerConfig & config) - : task_thread_pool(*this, config.task_thread_pool_size) + : cpu_task_thread_pool(*this, config.cpu_task_thread_pool_size) + , io_task_thread_pool(*this, config.io_task_thread_pool_size) , wait_reactor(*this) { } TaskScheduler::~TaskScheduler() { - task_thread_pool.close(); + cpu_task_thread_pool.close(); + io_task_thread_pool.close(); wait_reactor.close(); - task_thread_pool.waitForStop(); + cpu_task_thread_pool.waitForStop(); + io_task_thread_pool.waitForStop(); wait_reactor.waitForStop(); } @@ -44,6 +47,7 @@ void TaskScheduler::submit(std::vector & tasks) noexcept // The memory tracker is set by the caller. std::vector running_tasks; + std::vector io_tasks; std::list waiting_tasks; for (auto & task : tasks) { @@ -55,6 +59,9 @@ void TaskScheduler::submit(std::vector & tasks) noexcept case ExecTaskStatus::RUNNING: running_tasks.push_back(std::move(task)); break; + case ExecTaskStatus::IO: + io_tasks.push_back(std::move(task)); + break; case ExecTaskStatus::WAITING: waiting_tasks.push_back(std::move(task)); break; @@ -66,9 +73,36 @@ void TaskScheduler::submit(std::vector & tasks) noexcept } } tasks.clear(); - task_thread_pool.submit(running_tasks); + cpu_task_thread_pool.submit(running_tasks); + io_task_thread_pool.submit(io_tasks); wait_reactor.submit(waiting_tasks); } +void TaskScheduler::submitToWaitReactor(TaskPtr && task) +{ + wait_reactor.submit(std::move(task)); +} + +void TaskScheduler::submitToCPUTaskThreadPool(TaskPtr && task) +{ + cpu_task_thread_pool.submit(std::move(task)); +} + +void TaskScheduler::submitToCPUTaskThreadPool(std::vector & tasks) +{ + cpu_task_thread_pool.submit(tasks); +} + +void TaskScheduler::submitToIOTaskThreadPool(TaskPtr && task) +{ + io_task_thread_pool.submit(std::move(task)); +} + +void TaskScheduler::submitToIOTaskThreadPool(std::vector & tasks) +{ + io_task_thread_pool.submit(tasks); +} + std::unique_ptr TaskScheduler::instance; + } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.h b/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.h index 62348687572..b892b5d7a88 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.h +++ b/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.h @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -23,25 +24,31 @@ namespace DB { struct TaskSchedulerConfig { - size_t task_thread_pool_size; + size_t cpu_task_thread_pool_size; + size_t io_task_thread_pool_size; }; /** - * ┌─────────────────────┐ - * │ task scheduler │ - * │ │ - * │ ┌────────────────┐ │ - * │ │task thread pool│ │ - * │ └──────▲──┬──────┘ │ - * │ │ │ │ - * │ ┌────┴──▼────┐ │ - * │ │wait reactor│ │ - * │ └────────────┘ │ - * │ │ - * └─────────────────────┘ + * ┌────────────────────────────┐ + * │ task scheduler │ + * │ │ + * │ ┌───────────────────┐ │ + * │ ┌──┤io task thread pool◄─┐ │ + * │ │ └──────▲──┬─────────┘ │ │ + * │ │ │ │ │ │ + * │ │ ┌───────┴──▼─────────┐ │ │ + * │ │ │cpu task thread pool│ │ │ + * │ │ └───────▲──┬─────────┘ │ │ + * │ │ │ │ │ │ + * │ │ ┌────┴──▼────┐ │ │ + * │ └────►wait reactor├──────┘ │ + * │ └────────────┘ │ + * │ │ + * └────────────────────────────┘ * * A globally shared execution scheduler, used by pipeline executor. - * - task thread pool: for operator compute. + * - cpu task thread pool: for operator cpu intensive compute. + * - io task thread pool: for operator io intensive block. * - wait reactor: for polling asynchronous io status, etc. */ class TaskScheduler @@ -53,16 +60,24 @@ class TaskScheduler void submit(std::vector & tasks) noexcept; + void submitToWaitReactor(TaskPtr && task); + void submitToCPUTaskThreadPool(TaskPtr && task); + void submitToCPUTaskThreadPool(std::vector & tasks); + void submitToIOTaskThreadPool(TaskPtr && task); + void submitToIOTaskThreadPool(std::vector & tasks); + static std::unique_ptr instance; private: - TaskThreadPool task_thread_pool; + TaskThreadPool cpu_task_thread_pool; + + TaskThreadPool io_task_thread_pool; WaitReactor wait_reactor; LoggerPtr logger = Logger::get(); - friend class TaskThreadPool; + friend class TaskThreadPool; friend class WaitReactor; }; } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskThreadPool.cpp b/dbms/src/Flash/Pipeline/Schedule/TaskThreadPool.cpp index 449392399cc..b3f7a347775 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskThreadPool.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/TaskThreadPool.cpp @@ -15,17 +15,18 @@ #include #include #include -#include #include #include +#include #include #include #include namespace DB { -TaskThreadPool::TaskThreadPool(TaskScheduler & scheduler_, size_t thread_num) - : task_queue(std::make_unique()) +template +TaskThreadPool::TaskThreadPool(TaskScheduler & scheduler_, size_t thread_num) + : task_queue(Impl::newTaskQueue()) , scheduler(scheduler_) { RUNTIME_CHECK(thread_num > 0); @@ -34,19 +35,22 @@ TaskThreadPool::TaskThreadPool(TaskScheduler & scheduler_, size_t thread_num) threads.emplace_back(&TaskThreadPool::loop, this, i); } -void TaskThreadPool::close() +template +void TaskThreadPool::close() { task_queue->close(); } -void TaskThreadPool::waitForStop() +template +void TaskThreadPool::waitForStop() { for (auto & thread : threads) thread.join(); LOG_INFO(logger, "task thread pool is stopped"); } -void TaskThreadPool::loop(size_t thread_no) noexcept +template +void TaskThreadPool::loop(size_t thread_no) noexcept { auto thread_no_str = fmt::format("thread_no={}", thread_no); auto thread_logger = logger->getChild(thread_no_str); @@ -65,7 +69,8 @@ void TaskThreadPool::loop(size_t thread_no) noexcept LOG_INFO(thread_logger, "loop finished"); } -void TaskThreadPool::handleTask(TaskPtr & task, const LoggerPtr & log) noexcept +template +void TaskThreadPool::handleTask(TaskPtr & task, const LoggerPtr & log) noexcept { assert(task); TRACE_MEMORY(task); @@ -74,19 +79,22 @@ void TaskThreadPool::handleTask(TaskPtr & task, const LoggerPtr & log) noexcept ExecTaskStatus status; while (true) { - status = task->execute(); + status = Impl::exec(task); // The executing task should yield if it takes more than `YIELD_MAX_TIME_SPENT_NS`. - if (status != ExecTaskStatus::RUNNING || stopwatch.elapsed() >= YIELD_MAX_TIME_SPENT_NS) + if (status != Impl::TargetStatus || stopwatch.elapsed() >= YIELD_MAX_TIME_SPENT_NS) break; } switch (status) { case ExecTaskStatus::RUNNING: - submit(std::move(task)); + scheduler.submitToCPUTaskThreadPool(std::move(task)); + break; + case ExecTaskStatus::IO: + scheduler.submitToIOTaskThreadPool(std::move(task)); break; case ExecTaskStatus::WAITING: - scheduler.wait_reactor.submit(std::move(task)); + scheduler.submitToWaitReactor(std::move(task)); break; case FINISH_STATUS: task.reset(); @@ -96,13 +104,19 @@ void TaskThreadPool::handleTask(TaskPtr & task, const LoggerPtr & log) noexcept } } -void TaskThreadPool::submit(TaskPtr && task) noexcept +template +void TaskThreadPool::submit(TaskPtr && task) noexcept { task_queue->submit(std::move(task)); } -void TaskThreadPool::submit(std::vector & tasks) noexcept +template +void TaskThreadPool::submit(std::vector & tasks) noexcept { task_queue->submit(tasks); } + +template class TaskThreadPool; +template class TaskThreadPool; + } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskThreadPool.h b/dbms/src/Flash/Pipeline/Schedule/TaskThreadPool.h index 2f4fc3d1903..1fffd80b631 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskThreadPool.h +++ b/dbms/src/Flash/Pipeline/Schedule/TaskThreadPool.h @@ -25,6 +25,7 @@ namespace DB { class TaskScheduler; +template class TaskThreadPool { public: @@ -44,9 +45,9 @@ class TaskThreadPool void handleTask(TaskPtr & task, const LoggerPtr & log) noexcept; private: - TaskQueuePtr task_queue; + typename Impl::QueueType task_queue; - LoggerPtr logger = Logger::get(); + LoggerPtr logger = Logger::get(Impl::NAME); TaskScheduler & scheduler; diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskThreadPoolImpl.h b/dbms/src/Flash/Pipeline/Schedule/TaskThreadPoolImpl.h new file mode 100644 index 00000000000..abc9dd65c39 --- /dev/null +++ b/dbms/src/Flash/Pipeline/Schedule/TaskThreadPoolImpl.h @@ -0,0 +1,59 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB +{ +struct CPUImpl +{ + static constexpr auto NAME = "cpu intensive"; + + static constexpr auto TargetStatus = ExecTaskStatus::RUNNING; + + static ExecTaskStatus exec(TaskPtr & task) + { + return task->execute(); + } + + using QueueType = std::unique_ptr; + + static QueueType newTaskQueue() + { + return std::make_unique(); + } +}; + +struct IOImpl +{ + static constexpr auto NAME = "io intensive"; + + static constexpr auto TargetStatus = ExecTaskStatus::IO; + + static ExecTaskStatus exec(TaskPtr & task) + { + return task->executeIO(); + } + + using QueueType = std::unique_ptr; + + static QueueType newTaskQueue() + { + return std::make_unique(); + } +}; +} // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.cpp index 018a408e784..c46fd8ebb75 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.cpp @@ -69,6 +69,11 @@ ExecTaskStatus EventTask::executeImpl() noexcept return doTaskAction([&] { return doExecuteImpl(); }); } +ExecTaskStatus EventTask::executeIOImpl() noexcept +{ + return doTaskAction([&] { return doExecuteIOImpl(); }); +} + ExecTaskStatus EventTask::awaitImpl() noexcept { return doTaskAction([&] { return doAwaitImpl(); }); diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.h index 80163a80ece..134de0eb979 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.h @@ -40,6 +40,9 @@ class EventTask : public Task ExecTaskStatus executeImpl() noexcept override; virtual ExecTaskStatus doExecuteImpl() = 0; + ExecTaskStatus executeIOImpl() noexcept override; + virtual ExecTaskStatus doExecuteIOImpl() { return ExecTaskStatus::RUNNING; }; + ExecTaskStatus awaitImpl() noexcept override; virtual ExecTaskStatus doAwaitImpl() { return ExecTaskStatus::RUNNING; }; diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.cpp index e5820b8d893..f76cc84a64a 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.cpp @@ -48,6 +48,10 @@ void PipelineTask::finalizeImpl() { \ return ExecTaskStatus::CANCELLED; \ } \ + case OperatorStatus::IO: \ + { \ + return ExecTaskStatus::IO; \ + } \ case OperatorStatus::WAITING: \ { \ return ExecTaskStatus::WAITING; \ @@ -72,6 +76,25 @@ ExecTaskStatus PipelineTask::doExecuteImpl() } } +ExecTaskStatus PipelineTask::doExecuteIOImpl() +{ + assert(pipeline_exec); + auto op_status = pipeline_exec->executeIO(); + switch (op_status) + { + HANDLE_NOT_RUNNING_STATUS + // After `pipeline_exec->executeIO`, + // - `NEED_INPUT` means that pipeline_exec need data to do the calculations and expect the next call to `execute` + // - `HAS_OUTPUT` means that pipeline_exec has data to do the calculations and expect the next call to `execute` + // And other states are unexpected. + case OperatorStatus::NEED_INPUT: + case OperatorStatus::HAS_OUTPUT: + return ExecTaskStatus::RUNNING; + default: + UNEXPECTED_OP_STATUS(op_status, "PipelineTask::execute"); + } +} + ExecTaskStatus PipelineTask::doAwaitImpl() { assert(pipeline_exec); diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h index 35425785e90..0d59e20f5ea 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h @@ -32,6 +32,8 @@ class PipelineTask : public EventTask protected: ExecTaskStatus doExecuteImpl() override; + ExecTaskStatus doExecuteIOImpl() override; + ExecTaskStatus doAwaitImpl() override; void finalizeImpl() override; diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h index 1d13ca6bf18..309c4ba7c3c 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h @@ -34,7 +34,10 @@ extern const char random_pipeline_model_task_construct_failpoint[]; * ▲ * │ * ┌────────────────────────┐ - * │ WATITING◄─────►RUNNING │ + * │ ┌──►RUNNING◄──┐ │ + * │ │ │ │ + * │ ▼ ▼ │ + * │ WATITING◄────────►IO │ * └────────────────────────┘ */ enum class ExecTaskStatus @@ -42,6 +45,7 @@ enum class ExecTaskStatus INIT, WAITING, RUNNING, + IO, FINISHED, ERROR, CANCELLED, @@ -74,19 +78,30 @@ class Task ExecTaskStatus execute() noexcept { assert(getMemTracker().get() == current_memory_tracker); + assert(exec_status == ExecTaskStatus::INIT || exec_status == ExecTaskStatus::RUNNING); switchStatus(executeImpl()); return exec_status; } + ExecTaskStatus executeIO() noexcept + { + assert(getMemTracker().get() == current_memory_tracker); + assert(exec_status == ExecTaskStatus::INIT || exec_status == ExecTaskStatus::IO); + switchStatus(executeIOImpl()); + return exec_status; + } + ExecTaskStatus await() noexcept { assert(getMemTracker().get() == current_memory_tracker); + assert(exec_status == ExecTaskStatus::INIT || exec_status == ExecTaskStatus::WAITING); switchStatus(awaitImpl()); return exec_status; } protected: virtual ExecTaskStatus executeImpl() noexcept = 0; + virtual ExecTaskStatus executeIOImpl() noexcept { return ExecTaskStatus::RUNNING; } // Avoid allocating memory in `await` if possible. virtual ExecTaskStatus awaitImpl() noexcept { return ExecTaskStatus::RUNNING; } diff --git a/dbms/src/Flash/Pipeline/Schedule/WaitReactor.cpp b/dbms/src/Flash/Pipeline/Schedule/WaitReactor.cpp index a40f237902a..ac08f19abf6 100644 --- a/dbms/src/Flash/Pipeline/Schedule/WaitReactor.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/WaitReactor.cpp @@ -29,8 +29,8 @@ namespace class Spinner { public: - Spinner(TaskThreadPool & task_thread_pool_, const LoggerPtr & logger_) - : task_thread_pool(task_thread_pool_) + Spinner(TaskScheduler & task_scheduler_, const LoggerPtr & logger_) + : task_scheduler(task_scheduler_) , logger(logger_->getChild("Spinner")) {} @@ -45,6 +45,9 @@ class Spinner case ExecTaskStatus::RUNNING: running_tasks.push_back(std::move(task)); return true; + case ExecTaskStatus::IO: + io_tasks.push_back(std::move(task)); + return true; case ExecTaskStatus::WAITING: return false; case FINISH_STATUS: @@ -58,11 +61,15 @@ class Spinner // return false if there are no ready task to submit. bool submitReadyTasks() { - if (running_tasks.empty()) + if (running_tasks.empty() && io_tasks.empty()) return false; - task_thread_pool.submit(running_tasks); + task_scheduler.submitToCPUTaskThreadPool(running_tasks); running_tasks.clear(); + + task_scheduler.submitToIOTaskThreadPool(io_tasks); + io_tasks.clear(); + spin_count = 0; return true; } @@ -84,13 +91,14 @@ class Spinner } private: - TaskThreadPool & task_thread_pool; + TaskScheduler & task_scheduler; LoggerPtr logger; int16_t spin_count = 0; std::vector running_tasks; + std::vector io_tasks; }; } // namespace @@ -127,7 +135,7 @@ void WaitReactor::loop() noexcept LOG_INFO(logger, "start wait reactor loop"); ASSERT_MEMORY_TRACKER - Spinner spinner{scheduler.task_thread_pool, logger}; + Spinner spinner{scheduler, logger}; std::list local_waiting_tasks; // Get the incremental tasks from waiting_task_list. // return false if waiting_task_list has been closed. diff --git a/dbms/src/Flash/Pipeline/Schedule/tests/gtest_task_scheduler.cpp b/dbms/src/Flash/Pipeline/Schedule/tests/gtest_task_scheduler.cpp index 67cb76d472a..a0ea50cdb7a 100644 --- a/dbms/src/Flash/Pipeline/Schedule/tests/gtest_task_scheduler.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/tests/gtest_task_scheduler.cpp @@ -124,10 +124,59 @@ class SimpleWaitingTask : public Task Waiter & waiter; }; +class SimpleBlockedTask : public Task +{ +public: + explicit SimpleBlockedTask(Waiter & waiter_) + : waiter(waiter_) + {} + + ~SimpleBlockedTask() + { + waiter.notify(); + } + +protected: + ExecTaskStatus executeImpl() noexcept override + { + if (loop_count > 0) + { + if ((loop_count % 2) == 0) + return ExecTaskStatus::IO; + else + { + --loop_count; + return ExecTaskStatus::RUNNING; + } + } + return ExecTaskStatus::FINISHED; + } + + ExecTaskStatus executeIOImpl() noexcept override + { + if (loop_count > 0) + { + if ((loop_count % 2) == 0) + { + --loop_count; + return ExecTaskStatus::IO; + } + else + return ExecTaskStatus::RUNNING; + } + return ExecTaskStatus::FINISHED; + } + +private: + int loop_count = 10 + random() % 10; + Waiter & waiter; +}; + enum class TraceTaskStatus { initing, running, + io, waiting, }; class MemoryTraceTask : public Task @@ -152,6 +201,9 @@ class MemoryTraceTask : public Task switch (status) { case TraceTaskStatus::initing: + status = TraceTaskStatus::io; + return ExecTaskStatus::IO; + case TraceTaskStatus::io: status = TraceTaskStatus::waiting; return ExecTaskStatus::WAITING; case TraceTaskStatus::waiting: @@ -165,6 +217,13 @@ class MemoryTraceTask : public Task } } + ExecTaskStatus executeIOImpl() noexcept override + { + assert(status == TraceTaskStatus::io); + CurrentMemoryTracker::alloc(MEMORY_TRACER_SUBMIT_THRESHOLD + 10); + return ExecTaskStatus::RUNNING; + } + ExecTaskStatus awaitImpl() noexcept override { if (status == TraceTaskStatus::waiting) @@ -186,6 +245,11 @@ class DeadLoopTask : public Task } ExecTaskStatus awaitImpl() noexcept override + { + return ExecTaskStatus::IO; + } + + ExecTaskStatus executeIOImpl() noexcept override { return ExecTaskStatus::RUNNING; } @@ -199,7 +263,7 @@ class TaskSchedulerTestRunner : public ::testing::Test void submitAndWait(std::vector & tasks, Waiter & waiter) { - TaskSchedulerConfig config{thread_num}; + TaskSchedulerConfig config{thread_num, thread_num}; TaskScheduler task_scheduler{config}; task_scheduler.submit(tasks); waiter.wait(); @@ -255,7 +319,7 @@ TEST_F(TaskSchedulerTestRunner, shutdown) try { auto do_test = [](size_t task_thread_pool_size, size_t task_num) { - TaskSchedulerConfig config{task_thread_pool_size}; + TaskSchedulerConfig config{task_thread_pool_size, task_thread_pool_size}; TaskScheduler task_scheduler{config}; std::vector tasks; for (size_t i = 0; i < task_num; ++i) diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 7e896d7b097..c12a10c86c5 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -304,7 +304,8 @@ struct Settings M(SettingUInt64, max_spilled_bytes_per_file, 0, "Max spilled data bytes per spill file, 0 as the default value, 0 means no limit.") \ M(SettingBool, enable_planner, true, "Enable planner") \ M(SettingBool, enable_pipeline, false, "Enable pipeline model") \ - M(SettingUInt64, pipeline_task_thread_pool_size, 0, "The size of task thread pool. 0 means using number_of_logical_cpu_cores.") \ + M(SettingUInt64, pipeline_cpu_task_thread_pool_size, 0, "The size of cpu task thread pool. 0 means using number_of_logical_cpu_cores.") \ + M(SettingUInt64, pipeline_io_task_thread_pool_size, 0, "The size of io task thread pool. 0 means using number_of_logical_cpu_cores.") \ M(SettingUInt64, local_tunnel_version, 2, "1: not refined, 2: refined") // clang-format on #define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) TYPE NAME{DEFAULT}; diff --git a/dbms/src/Operators/Operator.cpp b/dbms/src/Operators/Operator.cpp index 2350c964561..2d6780f5908 100644 --- a/dbms/src/Operators/Operator.cpp +++ b/dbms/src/Operators/Operator.cpp @@ -43,6 +43,18 @@ OperatorStatus Operator::await() return op_status; } +OperatorStatus Operator::executeIO() +{ + CHECK_IS_CANCELLED + // TODO collect operator profile info here. + auto op_status = executeIOImpl(); +#ifndef NDEBUG + assertOperatorStatus(op_status, {OperatorStatus::NEED_INPUT, OperatorStatus::HAS_OUTPUT}); +#endif + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_pipeline_model_operator_run_failpoint); + return op_status; +} + OperatorStatus SourceOp::read(Block & block) { CHECK_IS_CANCELLED diff --git a/dbms/src/Operators/Operator.h b/dbms/src/Operators/Operator.h index 4bca48df754..3e3089dfe63 100644 --- a/dbms/src/Operators/Operator.h +++ b/dbms/src/Operators/Operator.h @@ -24,7 +24,7 @@ namespace DB /** * All interfaces of the operator may return the following state. * - finish status will only be returned by sink op, because only sink can tell if the pipeline has actually finished. - * - cancel status and waiting status can be returned in all method of operator. + * - cancel status, waiting status and io status can be returned in all method of operator. * - operator may return a different running status depending on the method. */ enum class OperatorStatus @@ -35,6 +35,8 @@ enum class OperatorStatus CANCELLED, /// waiting status WAITING, + /// io status + IO, /// running status // means that TransformOp/SinkOp needs to input a block to do the calculation, NEED_INPUT, @@ -55,11 +57,14 @@ class Operator {} virtual ~Operator() = default; - // running status may return are - // - `NEED_INPUT` means that the data that the operator is waiting for has been prepared. + // running status may return are NEED_INPUT and HAS_OUTPUT here. OperatorStatus await(); virtual OperatorStatus awaitImpl() { throw Exception("Unsupport"); } + // running status may return are NEED_INPUT and HAS_OUTPUT here. + OperatorStatus executeIO(); + virtual OperatorStatus executeIOImpl() { throw Exception("Unsupport"); } + // These two methods are used to set state, log and etc, and should not perform calculation logic. virtual void operatePrefix() {} virtual void operateSuffix() {} diff --git a/dbms/src/Operators/OperatorHelper.cpp b/dbms/src/Operators/OperatorHelper.cpp index 8dac56f9837..d6ee238e50f 100644 --- a/dbms/src/Operators/OperatorHelper.cpp +++ b/dbms/src/Operators/OperatorHelper.cpp @@ -25,9 +25,10 @@ void assertOperatorStatus( { switch (status) { - // cancel status and waiting status can be returned in all method of operator. + // cancel status, waiting and io status can be returned in all method of operator. case OperatorStatus::CANCELLED: case OperatorStatus::WAITING: + case OperatorStatus::IO: return; default: { diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 5920effa5b3..94b5375fdea 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1556,7 +1556,10 @@ int Server::main(const std::vector & /*args*/) auto get_pool_size = [](const auto & setting) { return setting == 0 ? getNumberOfLogicalCPUCores() : static_cast(setting); }; - TaskSchedulerConfig config{get_pool_size(settings.pipeline_task_thread_pool_size)}; + TaskSchedulerConfig config{ + get_pool_size(settings.pipeline_cpu_task_thread_pool_size), + get_pool_size(settings.pipeline_io_task_thread_pool_size), + }; assert(!TaskScheduler::instance); TaskScheduler::instance = std::make_unique(config); } diff --git a/dbms/src/TestUtils/ExecutorTestUtils.cpp b/dbms/src/TestUtils/ExecutorTestUtils.cpp index 168a9c32d2f..e010bf73f04 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.cpp +++ b/dbms/src/TestUtils/ExecutorTestUtils.cpp @@ -60,7 +60,7 @@ void ExecutorTest::SetUp() { initializeContext(); initializeClientInfo(); - TaskSchedulerConfig config{8}; + TaskSchedulerConfig config{8, 8}; assert(!TaskScheduler::instance); TaskScheduler::instance = std::make_unique(config); }