From d1a91f6d1171f089c0ae1aaab475383612230929 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 2 Feb 2023 15:19:57 +0800 Subject: [PATCH] Move `exec_status` from `PipelineExec` to `Operator` (#6711) ref pingcap/tiflash#6518 Signed-off-by: ywqzzy <592838129@qq.com> --- dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp | 15 --------------- dbms/src/Flash/Pipeline/Exec/PipelineExec.h | 8 +------- .../Flash/Pipeline/Exec/PipelineExecBuilder.cpp | 7 +++---- .../Flash/Pipeline/Exec/PipelineExecBuilder.h | 10 ++++++++-- .../Exec/tests/gtest_simple_operator.cpp | 4 ++-- dbms/src/Flash/Pipeline/Pipeline.cpp | 4 ++-- dbms/src/Flash/Planner/Plans/PhysicalFilter.cpp | 2 +- .../Planner/Plans/PhysicalGetResultSink.cpp | 2 +- dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp | 2 +- .../Plans/PhysicalMockExchangeReceiver.cpp | 2 +- .../Planner/Plans/PhysicalMockTableScan.cpp | 2 +- .../Flash/Planner/Plans/PhysicalProjection.cpp | 2 +- dbms/src/Operators/BlockInputStreamSourceOp.cpp | 4 +++- dbms/src/Operators/BlockInputStreamSourceOp.h | 2 +- dbms/src/Operators/ExpressionTransformOp.h | 4 +++- dbms/src/Operators/FilterTransformOp.h | 4 +++- dbms/src/Operators/GetResultSinkOp.h | 7 +++++-- dbms/src/Operators/LimitTransformOp.h | 4 +++- dbms/src/Operators/Operator.cpp | 15 +++++++++++++++ dbms/src/Operators/Operator.h | 16 ++++++++++++++++ 20 files changed, 71 insertions(+), 45 deletions(-) diff --git a/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp b/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp index d4d86e6be56..b694b49da98 100644 --- a/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp +++ b/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp @@ -12,16 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include #include namespace DB { -#define CHECK_IS_CANCELLED \ - if (unlikely(exec_status.isCancelled())) \ - return OperatorStatus::CANCELLED; - OperatorStatus PipelineExec::execute() { auto op_status = executeImpl(); @@ -50,13 +45,11 @@ OperatorStatus PipelineExec::executeImpl() // start from the next transform op after fetched block transform op. for (size_t transform_op_index = start_transform_op_index; transform_op_index < transform_ops.size(); ++transform_op_index) { - CHECK_IS_CANCELLED; const auto & transform_op = transform_ops[transform_op_index]; op_status = transform_op->transform(block); if (op_status != OperatorStatus::HAS_OUTPUT) return op_status; } - CHECK_IS_CANCELLED; op_status = sink_op->write(std::move(block)); return op_status; } @@ -66,13 +59,11 @@ OperatorStatus PipelineExec::fetchBlock( Block & block, size_t & start_transform_op_index) { - CHECK_IS_CANCELLED; auto op_status = sink_op->prepare(); if (op_status != OperatorStatus::NEED_INPUT) return op_status; for (int64_t index = transform_ops.size() - 1; index >= 0; --index) { - CHECK_IS_CANCELLED; const auto & transform_op = transform_ops[index]; op_status = transform_op->tryOutput(block); if (op_status != OperatorStatus::NEED_INPUT) @@ -82,7 +73,6 @@ OperatorStatus PipelineExec::fetchBlock( return op_status; } } - CHECK_IS_CANCELLED; start_transform_op_index = 0; op_status = source_op->read(block); return op_status; @@ -99,8 +89,6 @@ OperatorStatus PipelineExec::await() } OperatorStatus PipelineExec::awaitImpl() { - CHECK_IS_CANCELLED; - auto op_status = sink_op->await(); if (op_status != OperatorStatus::NEED_INPUT) return op_status; @@ -115,7 +103,4 @@ OperatorStatus PipelineExec::awaitImpl() op_status = source_op->await(); return op_status; } - -#undef CHECK_IS_CANCELLED - } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Exec/PipelineExec.h b/dbms/src/Flash/Pipeline/Exec/PipelineExec.h index e1ec3e8c14e..eef9a75c557 100644 --- a/dbms/src/Flash/Pipeline/Exec/PipelineExec.h +++ b/dbms/src/Flash/Pipeline/Exec/PipelineExec.h @@ -20,8 +20,6 @@ namespace DB { -class PipelineExecutorStatus; - // The executor for push model operator. // A pipeline will generate multiple pipeline_execs. // data flow: source --> transform --> .. --> transform --> sink @@ -29,12 +27,10 @@ class PipelineExec { public: PipelineExec( - PipelineExecutorStatus & exec_status_, SourceOpPtr && source_op_, TransformOps && transform_ops_, SinkOpPtr && sink_op_) - : exec_status(exec_status_) - , source_op(std::move(source_op_)) + : source_op(std::move(source_op_)) , transform_ops(std::move(transform_ops_)) , sink_op(std::move(sink_op_)) {} @@ -53,8 +49,6 @@ class PipelineExec size_t & start_transform_op_index); private: - PipelineExecutorStatus & exec_status; - SourceOpPtr source_op; TransformOps transform_ops; SinkOpPtr sink_op; diff --git a/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.cpp b/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.cpp index eb47d5f9eef..73b82e6865b 100644 --- a/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.cpp +++ b/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.cpp @@ -36,11 +36,10 @@ void PipelineExecBuilder::setSinkOp(SinkOpPtr && sink_op_) sink_op = std::move(sink_op_); } -PipelineExecPtr PipelineExecBuilder::build(PipelineExecutorStatus & exec_status) +PipelineExecPtr PipelineExecBuilder::build() { assert(source_op && sink_op); return std::make_unique( - exec_status, std::move(source_op), std::move(transform_ops), std::move(sink_op)); @@ -67,12 +66,12 @@ void PipelineExecGroupBuilder::init(size_t init_concurrency) group.resize(concurrency); } -PipelineExecGroup PipelineExecGroupBuilder::build(PipelineExecutorStatus & exec_status) +PipelineExecGroup PipelineExecGroupBuilder::build() { assert(concurrency > 0); PipelineExecGroup pipeline_exec_group; for (auto & builder : group) - pipeline_exec_group.push_back(builder.build(exec_status)); + pipeline_exec_group.push_back(builder.build()); return pipeline_exec_group; } diff --git a/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.h b/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.h index 1526e610cd9..4f0a986a932 100644 --- a/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.h +++ b/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.h @@ -32,7 +32,7 @@ struct PipelineExecBuilder Block getCurrentHeader() const; - PipelineExecPtr build(PipelineExecutorStatus & exec_status); + PipelineExecPtr build(); }; struct PipelineExecGroupBuilder @@ -41,6 +41,12 @@ struct PipelineExecGroupBuilder using BuilderGroup = std::vector; BuilderGroup group; + explicit PipelineExecGroupBuilder(PipelineExecutorStatus & exec_status_) + : exec_status(exec_status_) + {} + + PipelineExecutorStatus & exec_status; + size_t concurrency = 0; void init(size_t init_concurrency); @@ -56,7 +62,7 @@ struct PipelineExecGroupBuilder } } - PipelineExecGroup build(PipelineExecutorStatus & exec_status); + PipelineExecGroup build(); Block getCurrentHeader(); }; diff --git a/dbms/src/Flash/Pipeline/Exec/tests/gtest_simple_operator.cpp b/dbms/src/Flash/Pipeline/Exec/tests/gtest_simple_operator.cpp index 3d1494211f3..dc8aab43ed2 100644 --- a/dbms/src/Flash/Pipeline/Exec/tests/gtest_simple_operator.cpp +++ b/dbms/src/Flash/Pipeline/Exec/tests/gtest_simple_operator.cpp @@ -61,12 +61,12 @@ class SimpleOperatorTestRunner : public DB::tests::ExecutorTest assert(!result_handler.isIgnored()); auto plan_tree = PhysicalGetResultSink::build(result_handler, physical_plan.outputAndOptimize()); - PipelineExecGroupBuilder group_builder; + PipelineExecGroupBuilder group_builder{exec_status}; PhysicalPlanVisitor::visitPostOrder(plan_tree, [&](const PhysicalPlanNodePtr & plan) { assert(plan); plan->buildPipelineExec(group_builder, context.context, /*concurrency=*/1); }); - auto result = group_builder.build(exec_status); + auto result = group_builder.build(); assert(result.size() == 1); return {std::move(plan_tree), std::move(result.back())}; } diff --git a/dbms/src/Flash/Pipeline/Pipeline.cpp b/dbms/src/Flash/Pipeline/Pipeline.cpp index 50e00d609ec..24f7108c3dc 100644 --- a/dbms/src/Flash/Pipeline/Pipeline.cpp +++ b/dbms/src/Flash/Pipeline/Pipeline.cpp @@ -77,10 +77,10 @@ void Pipeline::addGetResultSink(ResultHandler result_handler) PipelineExecGroup Pipeline::buildExecGroup(PipelineExecutorStatus & exec_status, Context & context, size_t concurrency) { assert(!plan_nodes.empty()); - PipelineExecGroupBuilder builder; + PipelineExecGroupBuilder builder{exec_status}; for (const auto & plan_node : plan_nodes) plan_node->buildPipelineExec(builder, context, concurrency); - return builder.build(exec_status); + return builder.build(); } Events Pipeline::toEvents(PipelineExecutorStatus & status, Context & context, size_t concurrency) diff --git a/dbms/src/Flash/Planner/Plans/PhysicalFilter.cpp b/dbms/src/Flash/Planner/Plans/PhysicalFilter.cpp index fcb8a028ecb..323ad025bc8 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalFilter.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalFilter.cpp @@ -64,7 +64,7 @@ void PhysicalFilter::buildPipelineExec(PipelineExecGroupBuilder & group_builder, { auto input_header = group_builder.getCurrentHeader(); group_builder.transform([&](auto & builder) { - builder.appendTransformOp(std::make_unique(input_header, before_filter_actions, filter_column, log->identifier())); + builder.appendTransformOp(std::make_unique(group_builder.exec_status, input_header, before_filter_actions, filter_column, log->identifier())); }); } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.cpp b/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.cpp index 56ca2f6a325..4f6e17fd72a 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.cpp @@ -28,7 +28,7 @@ PhysicalPlanNodePtr PhysicalGetResultSink::build( void PhysicalGetResultSink::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) { group_builder.transform([&](auto & builder) { - builder.setSinkOp(std::make_unique(*this)); + builder.setSinkOp(std::make_unique(group_builder.exec_status, *this)); }); } } // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp b/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp index 3bc65878fe5..0314d79ea3f 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp @@ -58,7 +58,7 @@ void PhysicalLimit::buildPipelineExec(PipelineExecGroupBuilder & group_builder, auto input_header = group_builder.getCurrentHeader(); auto global_limit = std::make_shared(input_header, limit); group_builder.transform([&](auto & builder) { - builder.appendTransformOp(std::make_unique(global_limit, log->identifier())); + builder.appendTransformOp(std::make_unique(group_builder.exec_status, global_limit, log->identifier())); }); } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeReceiver.cpp b/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeReceiver.cpp index b5fef14a1df..ae109b35207 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeReceiver.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeReceiver.cpp @@ -68,7 +68,7 @@ void PhysicalMockExchangeReceiver::buildPipelineExec(PipelineExecGroupBuilder & group_builder.init(mock_streams.size()); size_t i = 0; group_builder.transform([&](auto & builder) { - builder.setSourceOp(std::make_unique(mock_streams[i++])); + builder.setSourceOp(std::make_unique(group_builder.exec_status, mock_streams[i++])); }); } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp index c84af203d46..4a1143686f5 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp @@ -114,7 +114,7 @@ void PhysicalMockTableScan::buildPipelineExec(PipelineExecGroupBuilder & group_b group_builder.init(mock_streams.size()); size_t i = 0; group_builder.transform([&](auto & builder) { - builder.setSourceOp(std::make_unique(mock_streams[i++])); + builder.setSourceOp(std::make_unique(group_builder.exec_status, mock_streams[i++])); }); } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalProjection.cpp b/dbms/src/Flash/Planner/Plans/PhysicalProjection.cpp index 18f7fcb81c7..dccfc7bd7b5 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalProjection.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalProjection.cpp @@ -148,7 +148,7 @@ void PhysicalProjection::buildPipelineExec(PipelineExecGroupBuilder & group_buil if (project_actions && !project_actions->getActions().empty()) { group_builder.transform([&](auto & builder) { - builder.appendTransformOp(std::make_unique(project_actions, log->identifier())); + builder.appendTransformOp(std::make_unique(group_builder.exec_status, project_actions, log->identifier())); }); } } diff --git a/dbms/src/Operators/BlockInputStreamSourceOp.cpp b/dbms/src/Operators/BlockInputStreamSourceOp.cpp index 44fd9426069..d3dafe1dad4 100644 --- a/dbms/src/Operators/BlockInputStreamSourceOp.cpp +++ b/dbms/src/Operators/BlockInputStreamSourceOp.cpp @@ -18,8 +18,10 @@ namespace DB { BlockInputStreamSourceOp::BlockInputStreamSourceOp( + PipelineExecutorStatus & exec_status_, const BlockInputStreamPtr & impl_) - : impl(impl_) + : SourceOp(exec_status_) + , impl(impl_) { impl->readPrefix(); setHeader(impl->getHeader()); diff --git a/dbms/src/Operators/BlockInputStreamSourceOp.h b/dbms/src/Operators/BlockInputStreamSourceOp.h index d61f0f77451..5bc916c9743 100644 --- a/dbms/src/Operators/BlockInputStreamSourceOp.h +++ b/dbms/src/Operators/BlockInputStreamSourceOp.h @@ -27,7 +27,7 @@ using BlockInputStreamPtr = std::shared_ptr; class BlockInputStreamSourceOp : public SourceOp { public: - explicit BlockInputStreamSourceOp(const BlockInputStreamPtr & impl_); + BlockInputStreamSourceOp(PipelineExecutorStatus & exec_status_, const BlockInputStreamPtr & impl_); String getName() const override { diff --git a/dbms/src/Operators/ExpressionTransformOp.h b/dbms/src/Operators/ExpressionTransformOp.h index 918e90d76bf..3993a6709b0 100644 --- a/dbms/src/Operators/ExpressionTransformOp.h +++ b/dbms/src/Operators/ExpressionTransformOp.h @@ -26,9 +26,11 @@ class ExpressionTransformOp : public TransformOp { public: ExpressionTransformOp( + PipelineExecutorStatus & exec_status_, const ExpressionActionsPtr & expression_, const String & req_id) - : expression(expression_) + : TransformOp(exec_status_) + , expression(expression_) , log(Logger::get(req_id)) {} diff --git a/dbms/src/Operators/FilterTransformOp.h b/dbms/src/Operators/FilterTransformOp.h index dbc1714678c..982934543cf 100644 --- a/dbms/src/Operators/FilterTransformOp.h +++ b/dbms/src/Operators/FilterTransformOp.h @@ -24,11 +24,13 @@ class FilterTransformOp : public TransformOp { public: FilterTransformOp( + PipelineExecutorStatus & exec_status_, const Block & input_header, const ExpressionActionsPtr & expression, const String & filter_column_name, const String & req_id) - : filter_transform_action(input_header, expression, filter_column_name) + : TransformOp(exec_status_) + , filter_transform_action(input_header, expression, filter_column_name) , log(Logger::get(req_id)) {} diff --git a/dbms/src/Operators/GetResultSinkOp.h b/dbms/src/Operators/GetResultSinkOp.h index fd013c2cbd6..b00770b8a5b 100644 --- a/dbms/src/Operators/GetResultSinkOp.h +++ b/dbms/src/Operators/GetResultSinkOp.h @@ -25,8 +25,11 @@ class PhysicalGetResultSink; class GetResultSinkOp : public SinkOp { public: - explicit GetResultSinkOp(PhysicalGetResultSink & physical_sink_) - : physical_sink(physical_sink_) + GetResultSinkOp( + PipelineExecutorStatus & exec_status_, + PhysicalGetResultSink & physical_sink_) + : SinkOp(exec_status_) + , physical_sink(physical_sink_) { } diff --git a/dbms/src/Operators/LimitTransformOp.h b/dbms/src/Operators/LimitTransformOp.h index 716c34b8b1c..3904086ac39 100644 --- a/dbms/src/Operators/LimitTransformOp.h +++ b/dbms/src/Operators/LimitTransformOp.h @@ -26,9 +26,11 @@ class LimitTransformOp : public TransformOp { public: LimitTransformOp( + PipelineExecutorStatus & exec_status_, const GlobalLimitPtr & action_, const String & req_id) - : action(action_) + : TransformOp(exec_status_) + , action(action_) , log(Logger::get(req_id)) {} diff --git a/dbms/src/Operators/Operator.cpp b/dbms/src/Operators/Operator.cpp index 03aed506df5..88426df7032 100644 --- a/dbms/src/Operators/Operator.cpp +++ b/dbms/src/Operators/Operator.cpp @@ -12,13 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include #include #include namespace DB { +#define CHECK_IS_CANCELLED \ + if (unlikely(exec_status.isCancelled())) \ + return OperatorStatus::CANCELLED; + OperatorStatus Operator::await() { + CHECK_IS_CANCELLED // TODO collect operator profile info here. auto op_status = awaitImpl(); #ifndef NDEBUG @@ -29,6 +36,7 @@ OperatorStatus Operator::await() OperatorStatus SourceOp::read(Block & block) { + CHECK_IS_CANCELLED // TODO collect operator profile info here. assert(!block); auto op_status = readImpl(block); @@ -45,6 +53,7 @@ OperatorStatus SourceOp::read(Block & block) OperatorStatus TransformOp::transform(Block & block) { + CHECK_IS_CANCELLED // TODO collect operator profile info here. auto op_status = transformImpl(block); #ifndef NDEBUG @@ -60,6 +69,7 @@ OperatorStatus TransformOp::transform(Block & block) OperatorStatus TransformOp::tryOutput(Block & block) { + CHECK_IS_CANCELLED // TODO collect operator profile info here. assert(!block); auto op_status = tryOutputImpl(block); @@ -76,6 +86,7 @@ OperatorStatus TransformOp::tryOutput(Block & block) OperatorStatus SinkOp::prepare() { + CHECK_IS_CANCELLED // TODO collect operator profile info here. auto op_status = prepareImpl(); #ifndef NDEBUG @@ -86,6 +97,7 @@ OperatorStatus SinkOp::prepare() OperatorStatus SinkOp::write(Block && block) { + CHECK_IS_CANCELLED #ifndef NDEBUG if (block) { @@ -100,4 +112,7 @@ OperatorStatus SinkOp::write(Block && block) #endif return op_status; } + +#undef CHECK_IS_CANCELLED + } // namespace DB diff --git a/dbms/src/Operators/Operator.h b/dbms/src/Operators/Operator.h index c2cc81e863f..cba082151ea 100644 --- a/dbms/src/Operators/Operator.h +++ b/dbms/src/Operators/Operator.h @@ -41,9 +41,15 @@ enum class OperatorStatus // TODO support operator profile info like `BlockStreamProfileInfo`. +class PipelineExecutorStatus; + class Operator { public: + explicit Operator(PipelineExecutorStatus & exec_status_) + : exec_status(exec_status_) + {} + virtual ~Operator() = default; // running status may return are // - `NEED_INPUT` means that the data that the operator is waiting for has been prepared. @@ -67,6 +73,7 @@ class Operator } protected: + PipelineExecutorStatus & exec_status; Block header; }; @@ -74,6 +81,9 @@ class Operator class SourceOp : public Operator { public: + explicit SourceOp(PipelineExecutorStatus & exec_status_) + : Operator(exec_status_) + {} // read will inplace the block when return status is HAS_OUTPUT; // Even after source has finished, source op still needs to return an empty block and HAS_OUTPUT, // because there are many operators that need an empty block as input, such as JoinProbe and WindowFunction. @@ -87,6 +97,9 @@ using SourceOpPtr = std::unique_ptr; class TransformOp : public Operator { public: + explicit TransformOp(PipelineExecutorStatus & exec_status_) + : Operator(exec_status_) + {} // running status may return are NEED_INPUT and HAS_OUTPUT here. // tryOutput will inplace the block when return status is HAS_OUPUT; do nothing to the block when NEED_INPUT or others. OperatorStatus tryOutput(Block &); @@ -115,6 +128,9 @@ using TransformOps = std::vector; class SinkOp : public Operator { public: + explicit SinkOp(PipelineExecutorStatus & exec_status_) + : Operator(exec_status_) + {} OperatorStatus prepare(); virtual OperatorStatus prepareImpl() { return OperatorStatus::NEED_INPUT; }