Skip to content

Commit

Permalink
Move exec_status from PipelineExec to Operator (pingcap#6711)
Browse files Browse the repository at this point in the history
ref pingcap#6518

Signed-off-by: ywqzzy <[email protected]>
  • Loading branch information
SeaRise authored and ywqzzy committed Feb 13, 2023
1 parent e030b15 commit d1a91f6
Show file tree
Hide file tree
Showing 20 changed files with 71 additions and 45 deletions.
15 changes: 0 additions & 15 deletions dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Executor/PipelineExecutorStatus.h>
#include <Flash/Pipeline/Exec/PipelineExec.h>
#include <Operators/OperatorHelper.h>

namespace DB
{
#define CHECK_IS_CANCELLED \
if (unlikely(exec_status.isCancelled())) \
return OperatorStatus::CANCELLED;

OperatorStatus PipelineExec::execute()
{
auto op_status = executeImpl();
Expand Down Expand Up @@ -50,13 +45,11 @@ OperatorStatus PipelineExec::executeImpl()
// start from the next transform op after fetched block transform op.
for (size_t transform_op_index = start_transform_op_index; transform_op_index < transform_ops.size(); ++transform_op_index)
{
CHECK_IS_CANCELLED;
const auto & transform_op = transform_ops[transform_op_index];
op_status = transform_op->transform(block);
if (op_status != OperatorStatus::HAS_OUTPUT)
return op_status;
}
CHECK_IS_CANCELLED;
op_status = sink_op->write(std::move(block));
return op_status;
}
Expand All @@ -66,13 +59,11 @@ OperatorStatus PipelineExec::fetchBlock(
Block & block,
size_t & start_transform_op_index)
{
CHECK_IS_CANCELLED;
auto op_status = sink_op->prepare();
if (op_status != OperatorStatus::NEED_INPUT)
return op_status;
for (int64_t index = transform_ops.size() - 1; index >= 0; --index)
{
CHECK_IS_CANCELLED;
const auto & transform_op = transform_ops[index];
op_status = transform_op->tryOutput(block);
if (op_status != OperatorStatus::NEED_INPUT)
Expand All @@ -82,7 +73,6 @@ OperatorStatus PipelineExec::fetchBlock(
return op_status;
}
}
CHECK_IS_CANCELLED;
start_transform_op_index = 0;
op_status = source_op->read(block);
return op_status;
Expand All @@ -99,8 +89,6 @@ OperatorStatus PipelineExec::await()
}
OperatorStatus PipelineExec::awaitImpl()
{
CHECK_IS_CANCELLED;

auto op_status = sink_op->await();
if (op_status != OperatorStatus::NEED_INPUT)
return op_status;
Expand All @@ -115,7 +103,4 @@ OperatorStatus PipelineExec::awaitImpl()
op_status = source_op->await();
return op_status;
}

#undef CHECK_IS_CANCELLED

} // namespace DB
8 changes: 1 addition & 7 deletions dbms/src/Flash/Pipeline/Exec/PipelineExec.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,17 @@

namespace DB
{
class PipelineExecutorStatus;

// The executor for push model operator.
// A pipeline will generate multiple pipeline_execs.
// data flow: source --> transform --> .. --> transform --> sink
class PipelineExec
{
public:
PipelineExec(
PipelineExecutorStatus & exec_status_,
SourceOpPtr && source_op_,
TransformOps && transform_ops_,
SinkOpPtr && sink_op_)
: exec_status(exec_status_)
, source_op(std::move(source_op_))
: source_op(std::move(source_op_))
, transform_ops(std::move(transform_ops_))
, sink_op(std::move(sink_op_))
{}
Expand All @@ -53,8 +49,6 @@ class PipelineExec
size_t & start_transform_op_index);

private:
PipelineExecutorStatus & exec_status;

SourceOpPtr source_op;
TransformOps transform_ops;
SinkOpPtr sink_op;
Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ void PipelineExecBuilder::setSinkOp(SinkOpPtr && sink_op_)
sink_op = std::move(sink_op_);
}

PipelineExecPtr PipelineExecBuilder::build(PipelineExecutorStatus & exec_status)
PipelineExecPtr PipelineExecBuilder::build()
{
assert(source_op && sink_op);
return std::make_unique<PipelineExec>(
exec_status,
std::move(source_op),
std::move(transform_ops),
std::move(sink_op));
Expand All @@ -67,12 +66,12 @@ void PipelineExecGroupBuilder::init(size_t init_concurrency)
group.resize(concurrency);
}

PipelineExecGroup PipelineExecGroupBuilder::build(PipelineExecutorStatus & exec_status)
PipelineExecGroup PipelineExecGroupBuilder::build()
{
assert(concurrency > 0);
PipelineExecGroup pipeline_exec_group;
for (auto & builder : group)
pipeline_exec_group.push_back(builder.build(exec_status));
pipeline_exec_group.push_back(builder.build());
return pipeline_exec_group;
}

Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct PipelineExecBuilder

Block getCurrentHeader() const;

PipelineExecPtr build(PipelineExecutorStatus & exec_status);
PipelineExecPtr build();
};

struct PipelineExecGroupBuilder
Expand All @@ -41,6 +41,12 @@ struct PipelineExecGroupBuilder
using BuilderGroup = std::vector<PipelineExecBuilder>;
BuilderGroup group;

explicit PipelineExecGroupBuilder(PipelineExecutorStatus & exec_status_)
: exec_status(exec_status_)
{}

PipelineExecutorStatus & exec_status;

size_t concurrency = 0;

void init(size_t init_concurrency);
Expand All @@ -56,7 +62,7 @@ struct PipelineExecGroupBuilder
}
}

PipelineExecGroup build(PipelineExecutorStatus & exec_status);
PipelineExecGroup build();

Block getCurrentHeader();
};
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Pipeline/Exec/tests/gtest_simple_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ class SimpleOperatorTestRunner : public DB::tests::ExecutorTest
assert(!result_handler.isIgnored());
auto plan_tree = PhysicalGetResultSink::build(result_handler, physical_plan.outputAndOptimize());

PipelineExecGroupBuilder group_builder;
PipelineExecGroupBuilder group_builder{exec_status};
PhysicalPlanVisitor::visitPostOrder(plan_tree, [&](const PhysicalPlanNodePtr & plan) {
assert(plan);
plan->buildPipelineExec(group_builder, context.context, /*concurrency=*/1);
});
auto result = group_builder.build(exec_status);
auto result = group_builder.build();
assert(result.size() == 1);
return {std::move(plan_tree), std::move(result.back())};
}
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ void Pipeline::addGetResultSink(ResultHandler result_handler)
PipelineExecGroup Pipeline::buildExecGroup(PipelineExecutorStatus & exec_status, Context & context, size_t concurrency)
{
assert(!plan_nodes.empty());
PipelineExecGroupBuilder builder;
PipelineExecGroupBuilder builder{exec_status};
for (const auto & plan_node : plan_nodes)
plan_node->buildPipelineExec(builder, context, concurrency);
return builder.build(exec_status);
return builder.build();
}

Events Pipeline::toEvents(PipelineExecutorStatus & status, Context & context, size_t concurrency)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/Plans/PhysicalFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void PhysicalFilter::buildPipelineExec(PipelineExecGroupBuilder & group_builder,
{
auto input_header = group_builder.getCurrentHeader();
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<FilterTransformOp>(input_header, before_filter_actions, filter_column, log->identifier()));
builder.appendTransformOp(std::make_unique<FilterTransformOp>(group_builder.exec_status, input_header, before_filter_actions, filter_column, log->identifier()));
});
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ PhysicalPlanNodePtr PhysicalGetResultSink::build(
void PhysicalGetResultSink::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/)
{
group_builder.transform([&](auto & builder) {
builder.setSinkOp(std::make_unique<GetResultSinkOp>(*this));
builder.setSinkOp(std::make_unique<GetResultSinkOp>(group_builder.exec_status, *this));
});
}
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void PhysicalLimit::buildPipelineExec(PipelineExecGroupBuilder & group_builder,
auto input_header = group_builder.getCurrentHeader();
auto global_limit = std::make_shared<GlobalLimitTransformAction>(input_header, limit);
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<LimitTransformOp>(global_limit, log->identifier()));
builder.appendTransformOp(std::make_unique<LimitTransformOp>(group_builder.exec_status, global_limit, log->identifier()));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void PhysicalMockExchangeReceiver::buildPipelineExec(PipelineExecGroupBuilder &
group_builder.init(mock_streams.size());
size_t i = 0;
group_builder.transform([&](auto & builder) {
builder.setSourceOp(std::make_unique<BlockInputStreamSourceOp>(mock_streams[i++]));
builder.setSourceOp(std::make_unique<BlockInputStreamSourceOp>(group_builder.exec_status, mock_streams[i++]));
});
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void PhysicalMockTableScan::buildPipelineExec(PipelineExecGroupBuilder & group_b
group_builder.init(mock_streams.size());
size_t i = 0;
group_builder.transform([&](auto & builder) {
builder.setSourceOp(std::make_unique<BlockInputStreamSourceOp>(mock_streams[i++]));
builder.setSourceOp(std::make_unique<BlockInputStreamSourceOp>(group_builder.exec_status, mock_streams[i++]));
});
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/Plans/PhysicalProjection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ void PhysicalProjection::buildPipelineExec(PipelineExecGroupBuilder & group_buil
if (project_actions && !project_actions->getActions().empty())
{
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<ExpressionTransformOp>(project_actions, log->identifier()));
builder.appendTransformOp(std::make_unique<ExpressionTransformOp>(group_builder.exec_status, project_actions, log->identifier()));
});
}
}
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Operators/BlockInputStreamSourceOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
namespace DB
{
BlockInputStreamSourceOp::BlockInputStreamSourceOp(
PipelineExecutorStatus & exec_status_,
const BlockInputStreamPtr & impl_)
: impl(impl_)
: SourceOp(exec_status_)
, impl(impl_)
{
impl->readPrefix();
setHeader(impl->getHeader());
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Operators/BlockInputStreamSourceOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
class BlockInputStreamSourceOp : public SourceOp
{
public:
explicit BlockInputStreamSourceOp(const BlockInputStreamPtr & impl_);
BlockInputStreamSourceOp(PipelineExecutorStatus & exec_status_, const BlockInputStreamPtr & impl_);

String getName() const override
{
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Operators/ExpressionTransformOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ class ExpressionTransformOp : public TransformOp
{
public:
ExpressionTransformOp(
PipelineExecutorStatus & exec_status_,
const ExpressionActionsPtr & expression_,
const String & req_id)
: expression(expression_)
: TransformOp(exec_status_)
, expression(expression_)
, log(Logger::get(req_id))
{}

Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Operators/FilterTransformOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ class FilterTransformOp : public TransformOp
{
public:
FilterTransformOp(
PipelineExecutorStatus & exec_status_,
const Block & input_header,
const ExpressionActionsPtr & expression,
const String & filter_column_name,
const String & req_id)
: filter_transform_action(input_header, expression, filter_column_name)
: TransformOp(exec_status_)
, filter_transform_action(input_header, expression, filter_column_name)
, log(Logger::get(req_id))
{}

Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Operators/GetResultSinkOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ class PhysicalGetResultSink;
class GetResultSinkOp : public SinkOp
{
public:
explicit GetResultSinkOp(PhysicalGetResultSink & physical_sink_)
: physical_sink(physical_sink_)
GetResultSinkOp(
PipelineExecutorStatus & exec_status_,
PhysicalGetResultSink & physical_sink_)
: SinkOp(exec_status_)
, physical_sink(physical_sink_)
{
}

Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Operators/LimitTransformOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ class LimitTransformOp : public TransformOp
{
public:
LimitTransformOp(
PipelineExecutorStatus & exec_status_,
const GlobalLimitPtr & action_,
const String & req_id)
: action(action_)
: TransformOp(exec_status_)
, action(action_)
, log(Logger::get(req_id))
{}

Expand Down
15 changes: 15 additions & 0 deletions dbms/src/Operators/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Executor/PipelineExecutorStatus.h>
#include <Flash/Pipeline/Exec/PipelineExec.h>
#include <Operators/Operator.h>
#include <Operators/OperatorHelper.h>

namespace DB
{
#define CHECK_IS_CANCELLED \
if (unlikely(exec_status.isCancelled())) \
return OperatorStatus::CANCELLED;

OperatorStatus Operator::await()
{
CHECK_IS_CANCELLED
// TODO collect operator profile info here.
auto op_status = awaitImpl();
#ifndef NDEBUG
Expand All @@ -29,6 +36,7 @@ OperatorStatus Operator::await()

OperatorStatus SourceOp::read(Block & block)
{
CHECK_IS_CANCELLED
// TODO collect operator profile info here.
assert(!block);
auto op_status = readImpl(block);
Expand All @@ -45,6 +53,7 @@ OperatorStatus SourceOp::read(Block & block)

OperatorStatus TransformOp::transform(Block & block)
{
CHECK_IS_CANCELLED
// TODO collect operator profile info here.
auto op_status = transformImpl(block);
#ifndef NDEBUG
Expand All @@ -60,6 +69,7 @@ OperatorStatus TransformOp::transform(Block & block)

OperatorStatus TransformOp::tryOutput(Block & block)
{
CHECK_IS_CANCELLED
// TODO collect operator profile info here.
assert(!block);
auto op_status = tryOutputImpl(block);
Expand All @@ -76,6 +86,7 @@ OperatorStatus TransformOp::tryOutput(Block & block)

OperatorStatus SinkOp::prepare()
{
CHECK_IS_CANCELLED
// TODO collect operator profile info here.
auto op_status = prepareImpl();
#ifndef NDEBUG
Expand All @@ -86,6 +97,7 @@ OperatorStatus SinkOp::prepare()

OperatorStatus SinkOp::write(Block && block)
{
CHECK_IS_CANCELLED
#ifndef NDEBUG
if (block)
{
Expand All @@ -100,4 +112,7 @@ OperatorStatus SinkOp::write(Block && block)
#endif
return op_status;
}

#undef CHECK_IS_CANCELLED

} // namespace DB
Loading

0 comments on commit d1a91f6

Please sign in to comment.