From ac240b4ce0903e422370fd0629290c02972b6211 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 20 Mar 2023 17:52:41 +0800 Subject: [PATCH] Pipeline: support fine grained shuffle (#6934) ref pingcap/tiflash#6518 --- .../MockExecutor/ExchangeSenderBinder.cpp | 38 +++- .../Debug/MockExecutor/ExchangeSenderBinder.h | 14 +- .../Coprocessor/DAGQueryBlockInterpreter.cpp | 2 +- .../Flash/Coprocessor/FineGrainedShuffle.h | 7 + dbms/src/Flash/Pipeline/Exec/PipelineExec.h | 2 +- .../Flash/Pipeline/Exec/PipelineExecBuilder.h | 8 - .../Exec/tests/gtest_simple_operator.cpp | 6 +- dbms/src/Flash/Pipeline/Pipeline.cpp | 173 ++++++++++++++---- dbms/src/Flash/Pipeline/Pipeline.h | 16 +- .../Events/FineGrainedPipelineEvent.cpp | 26 +++ ...lineEvent.h => FineGrainedPipelineEvent.h} | 29 ++- .../Schedule/Events/PlainPipelineEvent.cpp | 13 +- .../Schedule/Events/PlainPipelineEvent.h | 15 +- .../Pipeline/Schedule/Tasks/EventTask.cpp | 9 +- .../Flash/Pipeline/Schedule/Tasks/EventTask.h | 2 +- dbms/src/Flash/Planner/PhysicalPlan.cpp | 5 +- dbms/src/Flash/Planner/PhysicalPlanNode.cpp | 8 +- dbms/src/Flash/Planner/PhysicalPlanNode.h | 17 +- .../Planner/Plans/PhysicalAggregation.cpp | 4 +- .../Flash/Planner/Plans/PhysicalAggregation.h | 9 +- .../Plans/PhysicalAggregationBuild.cpp | 10 +- .../Planner/Plans/PhysicalAggregationBuild.h | 8 +- .../Plans/PhysicalAggregationConvergent.cpp | 12 +- .../Plans/PhysicalAggregationConvergent.h | 8 +- dbms/src/Flash/Planner/Plans/PhysicalBinary.h | 3 +- .../Plans/PhysicalExchangeReceiver.cpp | 31 ++-- .../Planner/Plans/PhysicalExchangeReceiver.h | 10 +- .../Planner/Plans/PhysicalExchangeSender.cpp | 19 +- .../Planner/Plans/PhysicalExchangeSender.h | 13 +- .../Flash/Planner/Plans/PhysicalExpand.cpp | 10 +- dbms/src/Flash/Planner/Plans/PhysicalExpand.h | 9 +- .../Flash/Planner/Plans/PhysicalFilter.cpp | 9 +- dbms/src/Flash/Planner/Plans/PhysicalFilter.h | 9 +- .../Planner/Plans/PhysicalGetResultSink.cpp | 17 +- .../Planner/Plans/PhysicalGetResultSink.h | 10 +- dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp | 4 +- dbms/src/Flash/Planner/Plans/PhysicalJoin.h | 8 +- dbms/src/Flash/Planner/Plans/PhysicalLeaf.h | 3 +- .../src/Flash/Planner/Plans/PhysicalLimit.cpp | 9 +- dbms/src/Flash/Planner/Plans/PhysicalLimit.h | 9 +- .../Plans/PhysicalMockExchangeReceiver.cpp | 16 +- .../Plans/PhysicalMockExchangeReceiver.h | 9 +- .../Plans/PhysicalMockExchangeSender.cpp | 1 + .../Plans/PhysicalMockExchangeSender.h | 10 +- .../Planner/Plans/PhysicalMockTableScan.cpp | 12 +- .../Planner/Plans/PhysicalMockTableScan.h | 6 +- .../Planner/Plans/PhysicalProjection.cpp | 11 +- .../Flash/Planner/Plans/PhysicalProjection.h | 9 +- .../Flash/Planner/Plans/PhysicalTableScan.cpp | 2 +- dbms/src/Flash/Planner/Plans/PhysicalTopN.cpp | 11 +- dbms/src/Flash/Planner/Plans/PhysicalTopN.h | 9 +- dbms/src/Flash/Planner/Plans/PhysicalUnary.h | 3 +- .../Flash/Planner/Plans/PhysicalWindow.cpp | 4 +- dbms/src/Flash/Planner/Plans/PhysicalWindow.h | 8 +- .../Planner/Plans/PhysicalWindowSort.cpp | 4 +- .../Flash/Planner/Plans/PhysicalWindowSort.h | 8 +- dbms/src/Flash/tests/gtest_compute_server.cpp | 20 +- .../tests/gtest_fine_grained_shuffle.cpp | 90 +++++++++ .../src/Flash/tests/gtest_window_executor.cpp | 2 - dbms/src/TestUtils/ExecutorTestUtils.cpp | 7 +- dbms/src/TestUtils/mockExecutor.cpp | 9 +- dbms/src/TestUtils/mockExecutor.h | 3 +- .../config/tiflash_dt_enable_pipeline.toml | 55 ++++++ tests/docker/tiflash-dt-enable-pipeline.yaml | 43 +++++ tests/tidb-ci/enable_pipeline | 1 + tests/tidb-ci/run.sh | 7 + tests/tidb-ci/tiflash-dt-enable-pipeline.yaml | 1 + 67 files changed, 759 insertions(+), 216 deletions(-) create mode 100644 dbms/src/Flash/Pipeline/Schedule/Events/FineGrainedPipelineEvent.cpp rename dbms/src/Flash/Pipeline/Schedule/Events/{PipelineEvent.h => FineGrainedPipelineEvent.h} (56%) create mode 100644 dbms/src/Flash/tests/gtest_fine_grained_shuffle.cpp create mode 100644 tests/docker/config/tiflash_dt_enable_pipeline.toml create mode 100644 tests/docker/tiflash-dt-enable-pipeline.yaml create mode 120000 tests/tidb-ci/enable_pipeline create mode 120000 tests/tidb-ci/tiflash-dt-enable-pipeline.yaml diff --git a/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp b/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp index 88bfc19c4fb..ef97c6b3e79 100644 --- a/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp +++ b/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp @@ -77,9 +77,43 @@ tipb::ExchangeType ExchangeSenderBinder::getType() const return type; } -ExecutorBinderPtr compileExchangeSender(ExecutorBinderPtr input, size_t & executor_index, tipb::ExchangeType exchange_type) +ExecutorBinderPtr compileExchangeSender( + ExecutorBinderPtr input, + size_t & executor_index, + tipb::ExchangeType exchange_type, + ASTPtr partition_key_list, + uint64_t fine_grained_shuffle_stream_count) { - ExecutorBinderPtr exchange_sender = std::make_shared(executor_index, input->output_schema, exchange_type); + std::vector partition_key_indexes; + for (const auto & partition_key : partition_key_list->children) + { + size_t schema_index = 0; + for (; schema_index < input->output_schema.size(); ++schema_index) + { + if (input->output_schema[schema_index].first == partition_key->getColumnName()) + { + partition_key_indexes.push_back(schema_index); + break; + } + } + auto schema_string = [&]() { + FmtBuffer buffer; + buffer.joinStr( + input->output_schema.cbegin(), + input->output_schema.cend(), + [](const auto & item, FmtBuffer & buf) { buf.append(item.first); }, + ", "); + return buffer.toString(); + }; + if (schema_index == input->output_schema.size()) + throw Exception(fmt::format("Unknown partition key: {}, schema is [{}]", partition_key->getColumnName(), schema_string())); + } + ExecutorBinderPtr exchange_sender = std::make_shared( + executor_index, + input->output_schema, + exchange_type, + partition_key_indexes, + fine_grained_shuffle_stream_count); exchange_sender->children.push_back(input); return exchange_sender; } diff --git a/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.h b/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.h index c03899351eb..f6d10599614 100644 --- a/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.h +++ b/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.h @@ -22,7 +22,12 @@ namespace DB::mock class ExchangeSenderBinder : public ExecutorBinder { public: - ExchangeSenderBinder(size_t & index, const DAGSchema & output, tipb::ExchangeType type_, const std::vector & partition_keys_ = {}, uint64_t fine_grained_shuffle_stream_count_ = 0) + ExchangeSenderBinder( + size_t & index, + const DAGSchema & output, + tipb::ExchangeType type_, + const std::vector & partition_keys_ = {}, + uint64_t fine_grained_shuffle_stream_count_ = 0) : ExecutorBinder(index, "exchange_sender_" + std::to_string(index), output) , type(type_) , partition_keys(partition_keys_) @@ -42,5 +47,10 @@ class ExchangeSenderBinder : public ExecutorBinder uint64_t fine_grained_shuffle_stream_count; }; -ExecutorBinderPtr compileExchangeSender(ExecutorBinderPtr input, size_t & executor_index, tipb::ExchangeType exchange_type); +ExecutorBinderPtr compileExchangeSender( + ExecutorBinderPtr input, + size_t & executor_index, + tipb::ExchangeType exchange_type, + ASTPtr partition_key_list = {}, + uint64_t fine_grained_shuffle_stream_count = 0); } // namespace DB::mock diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index c805dd7cd50..5a10c639a97 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -783,7 +783,7 @@ void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline) { extra_info = String(enableFineGrainedShuffleExtraInfo); RUNTIME_CHECK(exchange_sender.tp() == tipb::ExchangeType::Hash, ExchangeType_Name(exchange_sender.tp())); - RUNTIME_CHECK(stream_count <= 1024, stream_count); + RUNTIME_CHECK(stream_count <= maxFineGrainedStreamCount, stream_count); } pipeline.transform([&](auto & stream) { // construct writer diff --git a/dbms/src/Flash/Coprocessor/FineGrainedShuffle.h b/dbms/src/Flash/Coprocessor/FineGrainedShuffle.h index c6bced12ba4..f6415442efc 100644 --- a/dbms/src/Flash/Coprocessor/FineGrainedShuffle.h +++ b/dbms/src/Flash/Coprocessor/FineGrainedShuffle.h @@ -21,6 +21,8 @@ namespace DB { static constexpr std::string_view enableFineGrainedShuffleExtraInfo = "enable fine grained shuffle"; +static constexpr size_t maxFineGrainedStreamCount = 1024; + inline bool enableFineGrainedShuffle(uint64_t stream_count) { return stream_count > 0; @@ -28,6 +30,11 @@ inline bool enableFineGrainedShuffle(uint64_t stream_count) struct FineGrainedShuffle { + FineGrainedShuffle() + : stream_count(0) + , batch_size(0) + {} + explicit FineGrainedShuffle(const tipb::Executor * executor) : stream_count(executor ? executor->fine_grained_shuffle_stream_count() : 0) , batch_size(executor ? executor->fine_grained_shuffle_batch_size() : 0) diff --git a/dbms/src/Flash/Pipeline/Exec/PipelineExec.h b/dbms/src/Flash/Pipeline/Exec/PipelineExec.h index 0230cfe10da..7fb845b3df1 100644 --- a/dbms/src/Flash/Pipeline/Exec/PipelineExec.h +++ b/dbms/src/Flash/Pipeline/Exec/PipelineExec.h @@ -23,7 +23,7 @@ namespace DB // The executor for push model operator. // A pipeline will generate multiple pipeline_execs. // data flow: source --> transform --> .. --> transform --> sink -class PipelineExec +class PipelineExec : private boost::noncopyable { public: PipelineExec( diff --git a/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.h b/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.h index 4f0a986a932..1211b387125 100644 --- a/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.h +++ b/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.h @@ -18,8 +18,6 @@ namespace DB { -class PipelineExecutorStatus; - struct PipelineExecBuilder { SourceOpPtr source_op; @@ -41,12 +39,6 @@ struct PipelineExecGroupBuilder using BuilderGroup = std::vector; BuilderGroup group; - explicit PipelineExecGroupBuilder(PipelineExecutorStatus & exec_status_) - : exec_status(exec_status_) - {} - - PipelineExecutorStatus & exec_status; - size_t concurrency = 0; void init(size_t init_concurrency); diff --git a/dbms/src/Flash/Pipeline/Exec/tests/gtest_simple_operator.cpp b/dbms/src/Flash/Pipeline/Exec/tests/gtest_simple_operator.cpp index d60be409163..7bcaf23210a 100644 --- a/dbms/src/Flash/Pipeline/Exec/tests/gtest_simple_operator.cpp +++ b/dbms/src/Flash/Pipeline/Exec/tests/gtest_simple_operator.cpp @@ -60,12 +60,12 @@ class SimpleOperatorTestRunner : public DB::tests::ExecutorTest PhysicalPlan physical_plan{*context.context, ""}; physical_plan.build(request.get()); assert(!result_handler.isIgnored()); - auto plan_tree = PhysicalGetResultSink::build(std::move(result_handler), physical_plan.outputAndOptimize()); + auto plan_tree = PhysicalGetResultSink::build(std::move(result_handler), Logger::get(), physical_plan.outputAndOptimize()); - PipelineExecGroupBuilder group_builder{exec_status}; + PipelineExecGroupBuilder group_builder; PhysicalPlanVisitor::visitPostOrder(plan_tree, [&](const PhysicalPlanNodePtr & plan) { assert(plan); - plan->buildPipelineExec(group_builder, *context.context, /*concurrency=*/1); + plan->buildPipelineExecGroup(exec_status, group_builder, *context.context, /*concurrency=*/1); }); auto result = group_builder.build(); assert(result.size() == 1); diff --git a/dbms/src/Flash/Pipeline/Pipeline.cpp b/dbms/src/Flash/Pipeline/Pipeline.cpp index 71756ab6699..2684643c47a 100644 --- a/dbms/src/Flash/Pipeline/Pipeline.cpp +++ b/dbms/src/Flash/Pipeline/Pipeline.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -34,6 +35,85 @@ FmtBuffer & addPrefix(FmtBuffer & buffer, size_t level) } } // namespace +PipelineEvents::PipelineEvents(Events && events_, bool is_fine_grained_) + : events(std::move(events_)) + , is_fine_grained(is_fine_grained_) +{ + RUNTIME_CHECK(!events.empty()); + // For non fine grained mode, the size of events must be 1. + RUNTIME_CHECK(is_fine_grained || events.size() == 1); +} + +void PipelineEvents::mapInputs(const PipelineEvents & inputs) +{ + /// The self events is output. + if (inputs.is_fine_grained && is_fine_grained) + { + if (inputs.events.size() == events.size()) + { + /** + * 1. If the number of partitions match, use fine grained mapping here. + * ``` + * FineGrainedPipelineEvent◄────FineGrainedPipelineEvent + * FineGrainedPipelineEvent◄────FineGrainedPipelineEvent + * FineGrainedPipelineEvent◄────FineGrainedPipelineEvent + * FineGrainedPipelineEvent◄────FineGrainedPipelineEvent + * ``` + */ + size_t partition_num = inputs.events.size(); + for (size_t index = 0; index < partition_num; ++index) + events[index]->addInput(inputs.events[index]); + } + else + { + /** + * 2. If the number of partitions does not match, it is safer to use full mapping. + * ``` + * FineGrainedPipelineEvent◄──┐ ┌──FineGrainedPipelineEvent + * FineGrainedPipelineEvent◄──┼─┼──FineGrainedPipelineEvent + * FineGrainedPipelineEvent◄──┤ └──FineGrainedPipelineEvent + * FineGrainedPipelineEvent◄──┘ + * ``` + */ + for (const auto & output : events) + { + for (const auto & input : inputs.events) + output->addInput(input); + } + } + } + else + { + /** + * Use full mapping here. + * 1. for non fine grained inputs and non fine grained outputs + * The size of inputs and outputs must be the same and 1. + * ``` + * PlainPipelineEvent◄────PlainPipelineEvent + * ``` + * 2. for non fine grained inputs and fine grained outputs + * This is not possible, if fine-grained is enabled in outputs, then inputs must also be enabled. + * 3. for fine grained inputs and non fine grained outputs + * ``` + * ┌──FineGrainedPipelineEvent + * PlainPipelineEvent◄──┼──FineGrainedPipelineEvent + * ├──FineGrainedPipelineEvent + * └──FineGrainedPipelineEvent + * + * PlainPipelineEvent◄────FineGrainedPipelineEvent + * ``` + */ + + // If the outputs is fine grained model, the intputs must also be. + RUNTIME_CHECK(inputs.is_fine_grained || !is_fine_grained); + for (const auto & output : events) + { + for (const auto & input : inputs.events) + output->addInput(input); + } + } +} + void Pipeline::addPlanNode(const PhysicalPlanNodePtr & plan_node) { plan_nodes.push_back(plan_node); @@ -76,50 +156,76 @@ void Pipeline::toTreeString(FmtBuffer & buffer, size_t level) const void Pipeline::addGetResultSink(ResultHandler && result_handler) { assert(!plan_nodes.empty()); - auto get_result_sink = PhysicalGetResultSink::build(std::move(result_handler), plan_nodes.back()); + auto get_result_sink = PhysicalGetResultSink::build(std::move(result_handler), log, plan_nodes.back()); addPlanNode(get_result_sink); } PipelineExecGroup Pipeline::buildExecGroup(PipelineExecutorStatus & exec_status, Context & context, size_t concurrency) { assert(!plan_nodes.empty()); - PipelineExecGroupBuilder builder{exec_status}; + PipelineExecGroupBuilder builder; for (const auto & plan_node : plan_nodes) - plan_node->buildPipelineExec(builder, context, concurrency); + plan_node->buildPipelineExecGroup(exec_status, builder, context, concurrency); return builder.build(); } +/** + * There are two execution modes in pipeline. + * 1. non fine grained mode + * A pipeline generates an event(PlainPipelineEvent). + * This means that all the operators in the pipeline are finished before the next pipeline is triggered. + * 2. fine grained mode + * A pipeline will generate n Events(FineGrainedPipelineEvent), one for each data partition. + * There is a fine-grained mapping of Events between Pipelines, e.g. only Events from the same data partition will have dependencies on each other. + * This means that once some data partition of the previous pipeline has finished, the operators of the next pipeline's corresponding data partition can be started without having to wait for the entire pipeline to finish. + * + * ┌──non fine grained mode──┐ ┌──fine grained mode──┐ + * ┌──FineGrainedPipelineEvent◄───FineGrainedPipelineEvent + * PlainPipelineEvent◄───PlainPipelineEvent◄──┼──FineGrainedPipelineEvent◄───FineGrainedPipelineEvent + * └──FineGrainedPipelineEvent◄───FineGrainedPipelineEvent + */ +bool Pipeline::isFineGrainedMode() const +{ + assert(!plan_nodes.empty()); + // The source plan node determines whether the execution mode is fine grained or non-fine grained. + return plan_nodes.front()->getFineGrainedShuffle().enable(); +} + Events Pipeline::toEvents(PipelineExecutorStatus & status, Context & context, size_t concurrency) { Events all_events; - toEvent(status, context, concurrency, all_events); + doToEvents(status, context, concurrency, all_events); assert(!all_events.empty()); return all_events; } -EventPtr Pipeline::toEvent(PipelineExecutorStatus & status, Context & context, size_t concurrency, Events & all_events) -{ - // TODO support fine grained shuffle - // - a fine grained partition maps to an event - // - the event flow will be - // ``` - // disable fine grained partition pipeline enable fine grained partition pipeline enable fine grained partition pipeline - // ┌───────────────FineGrainedPipelineEvent<────────────────FineGrainedPipelineEvent - // PlainPipelineEvent<────────┼───────────────FineGrainedPipelineEvent<────────────────FineGrainedPipelineEvent - // ├───────────────FineGrainedPipelineEvent<────────────────FineGrainedPipelineEvent - // └───────────────FineGrainedPipelineEvent<────────────────FineGrainedPipelineEvent - // ``` +PipelineEvents Pipeline::toSelfEvents(PipelineExecutorStatus & status, Context & context, size_t concurrency) +{ auto memory_tracker = current_memory_tracker ? current_memory_tracker->shared_from_this() : nullptr; - - auto plain_pipeline_event = std::make_shared(status, memory_tracker, log->identifier(), context, shared_from_this(), concurrency); - for (const auto & child : children) + Events self_events; + assert(!plan_nodes.empty()); + if (isFineGrainedMode()) { - auto input = child->toEvent(status, context, concurrency, all_events); - assert(input); - plain_pipeline_event->addInput(input); + auto fine_grained_exec_group = buildExecGroup(status, context, concurrency); + for (auto & pipeline_exec : fine_grained_exec_group) + self_events.push_back(std::make_shared(status, memory_tracker, log->identifier(), std::move(pipeline_exec))); + LOG_DEBUG(log, "Execute in fine grained model and generate {} fine grained pipeline event", self_events.size()); } - all_events.push_back(plain_pipeline_event); - return plain_pipeline_event; + else + { + self_events.push_back(std::make_shared(status, memory_tracker, log->identifier(), context, shared_from_this(), concurrency)); + LOG_DEBUG(log, "Execute in non fine grained model and generate one plain pipeline event"); + } + return {std::move(self_events), isFineGrainedMode()}; +} + +PipelineEvents Pipeline::doToEvents(PipelineExecutorStatus & status, Context & context, size_t concurrency, Events & all_events) +{ + auto self_events = toSelfEvents(status, context, concurrency); + for (const auto & child : children) + self_events.mapInputs(child->doToEvents(status, context, concurrency, all_events)); + all_events.insert(all_events.end(), self_events.events.cbegin(), self_events.events.cend()); + return self_events; } bool Pipeline::isSupported(const tipb::DAGRequest & dag_request) @@ -128,29 +234,26 @@ bool Pipeline::isSupported(const tipb::DAGRequest & dag_request) traverseExecutors( &dag_request, [&](const tipb::Executor & executor) { - // TODO support fine grained shuffle. - if (FineGrainedShuffle(&executor).enable()) - { - is_supported = false; - return false; - } switch (executor.tp()) { - case tipb::ExecType::TypeProjection: - case tipb::ExecType::TypeSelection: - case tipb::ExecType::TypeLimit: - case tipb::ExecType::TypeTopN: - case tipb::ExecType::TypeAggregation: case tipb::ExecType::TypeTableScan: if (executor.tbl_scan().keep_order()) { is_supported = false; return false; } + case tipb::ExecType::TypeProjection: + case tipb::ExecType::TypeSelection: + case tipb::ExecType::TypeLimit: + case tipb::ExecType::TypeTopN: case tipb::ExecType::TypeExchangeSender: case tipb::ExecType::TypeExchangeReceiver: case tipb::ExecType::TypeExpand: return true; + case tipb::ExecType::TypeAggregation: + // TODO support fine grained shuffle. + if (!FineGrainedShuffle(&executor).enable()) + return true; default: is_supported = false; return false; diff --git a/dbms/src/Flash/Pipeline/Pipeline.h b/dbms/src/Flash/Pipeline/Pipeline.h index 37504b29b6a..2f9a4a394bf 100644 --- a/dbms/src/Flash/Pipeline/Pipeline.h +++ b/dbms/src/Flash/Pipeline/Pipeline.h @@ -40,8 +40,15 @@ using PhysicalPlanNodePtr = std::shared_ptr; class PipelineExecutorStatus; -class SharedQueue; -using SharedQueuePtr = std::shared_ptr; +struct PipelineEvents +{ + Events events; + bool is_fine_grained; + + PipelineEvents(Events && events_, bool is_fine_grained_); + + void mapInputs(const PipelineEvents & inputs); +}; class Pipeline : public std::enable_shared_from_this { @@ -68,10 +75,13 @@ class Pipeline : public std::enable_shared_from_this Block getSampleBlock() const; + bool isFineGrainedMode() const; + private: void toSelfString(FmtBuffer & buffer, size_t level) const; - EventPtr toEvent(PipelineExecutorStatus & status, Context & context, size_t concurrency, Events & all_events); + PipelineEvents toSelfEvents(PipelineExecutorStatus & status, Context & context, size_t concurrency); + PipelineEvents doToEvents(PipelineExecutorStatus & status, Context & context, size_t concurrency, Events & all_events); private: const UInt32 id; diff --git a/dbms/src/Flash/Pipeline/Schedule/Events/FineGrainedPipelineEvent.cpp b/dbms/src/Flash/Pipeline/Schedule/Events/FineGrainedPipelineEvent.cpp new file mode 100644 index 00000000000..3cd63cf6c59 --- /dev/null +++ b/dbms/src/Flash/Pipeline/Schedule/Events/FineGrainedPipelineEvent.cpp @@ -0,0 +1,26 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ +std::vector FineGrainedPipelineEvent::scheduleImpl() +{ + std::vector tasks; + tasks.push_back(std::make_unique(mem_tracker, log->identifier(), exec_status, shared_from_this(), std::move(pipeline_exec))); + return tasks; +} +} // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Events/PipelineEvent.h b/dbms/src/Flash/Pipeline/Schedule/Events/FineGrainedPipelineEvent.h similarity index 56% rename from dbms/src/Flash/Pipeline/Schedule/Events/PipelineEvent.h rename to dbms/src/Flash/Pipeline/Schedule/Events/FineGrainedPipelineEvent.h index d3631bbd3b5..0c85ec024a4 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Events/PipelineEvent.h +++ b/dbms/src/Flash/Pipeline/Schedule/Events/FineGrainedPipelineEvent.h @@ -14,37 +14,30 @@ #pragma once +#include #include namespace DB { -class Pipeline; -using PipelinePtr = std::shared_ptr; - -// The base class of pipeline related event. -class PipelineEvent : public Event +class FineGrainedPipelineEvent : public Event { public: - PipelineEvent( + FineGrainedPipelineEvent( PipelineExecutorStatus & exec_status_, MemoryTrackerPtr mem_tracker_, const String & req_id, - Context & context_, - const PipelinePtr & pipeline_) + PipelineExecPtr && pipeline_exec_) : Event(exec_status_, std::move(mem_tracker_), req_id) - , context(context_) - , pipeline(pipeline_) - {} - - void finishImpl() override + , pipeline_exec(std::move(pipeline_exec_)) { - // Plan nodes in pipeline hold resources like hash table for join, when destruction they will operate memory tracker in MPP task. But MPP task may get destructed once `exec_status.onEventFinish()` is called. - // So pipeline needs to be released before `exec_status.onEventFinish()` is called. - pipeline.reset(); + assert(pipeline_exec); } protected: - Context & context; - PipelinePtr pipeline; + std::vector scheduleImpl() override; + +private: + // The pipeline exec for executing the specific fine-grained partition. + PipelineExecPtr pipeline_exec; }; } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.cpp b/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.cpp index bea061b551c..5485cf59be6 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.cpp @@ -22,11 +22,18 @@ std::vector PlainPipelineEvent::scheduleImpl() { assert(pipeline); auto pipeline_exec_group = pipeline->buildExecGroup(exec_status, context, concurrency); - assert(!pipeline_exec_group.empty()); + RUNTIME_CHECK(!pipeline_exec_group.empty()); std::vector tasks; tasks.reserve(pipeline_exec_group.size()); - for (auto & pipline_exec : pipeline_exec_group) - tasks.push_back(std::make_unique(mem_tracker, log->identifier(), exec_status, shared_from_this(), std::move(pipline_exec))); + for (auto & pipeline_exec : pipeline_exec_group) + tasks.push_back(std::make_unique(mem_tracker, log->identifier(), exec_status, shared_from_this(), std::move(pipeline_exec))); return tasks; } + +void PlainPipelineEvent::finishImpl() +{ + // Plan nodes in pipeline hold resources like hash table for join, when destruction they will operate memory tracker in MPP task. But MPP task may get destructed once `exec_status.onEventFinish()` is called. + // So pipeline needs to be released before `exec_status.onEventFinish()` is called. + pipeline.reset(); +} } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.h b/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.h index e0b4bd9fe05..6df1c425ae6 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.h +++ b/dbms/src/Flash/Pipeline/Schedule/Events/PlainPipelineEvent.h @@ -14,11 +14,14 @@ #pragma once -#include +#include namespace DB { -class PlainPipelineEvent : public PipelineEvent +class Pipeline; +using PipelinePtr = std::shared_ptr; + +class PlainPipelineEvent : public Event { public: PlainPipelineEvent( @@ -28,14 +31,20 @@ class PlainPipelineEvent : public PipelineEvent Context & context_, const PipelinePtr & pipeline_, size_t concurrency_) - : PipelineEvent(exec_status_, std::move(mem_tracker_), req_id, context_, pipeline_) + : Event(exec_status_, std::move(mem_tracker_), req_id) + , context(context_) + , pipeline(pipeline_) , concurrency(concurrency_) {} protected: std::vector scheduleImpl() override; + void finishImpl() override; + private: + Context & context; + PipelinePtr pipeline; size_t concurrency; }; } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.cpp index 82605f95402..018a408e784 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.cpp @@ -54,14 +54,13 @@ void EventTask::finalize() noexcept { try { - bool tmp = false; - if (finalized.compare_exchange_strong(tmp, true)) - finalizeImpl(); + RUNTIME_CHECK(!finalized); + finalized = true; + finalizeImpl(); } catch (...) { - // ignore exception from finalizeImpl. - LOG_WARNING(log, "finalizeImpl throw exception: {}", getCurrentExceptionMessage(true, true)); + exec_status.onErrorOccurred(std::current_exception()); } } diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.h index 2b884831bd4..80163a80ece 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.h @@ -53,7 +53,7 @@ class EventTask : public Task private: PipelineExecutorStatus & exec_status; EventPtr event; - std::atomic_bool finalized{false}; + bool finalized = false; }; } // namespace DB diff --git a/dbms/src/Flash/Planner/PhysicalPlan.cpp b/dbms/src/Flash/Planner/PhysicalPlan.cpp index 45bd8d00203..5170ead776d 100644 --- a/dbms/src/Flash/Planner/PhysicalPlan.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlan.cpp @@ -153,14 +153,13 @@ void PhysicalPlan::build(const String & executor_id, const tipb::Executor * exec GET_METRIC(tiflash_coprocessor_executor_count, type_exchange_receiver).Increment(); if (unlikely(context.isExecutorTest() || context.isInterpreterTest())) { - size_t fine_grained_stream_count = executor->has_fine_grained_shuffle_stream_count() ? executor->fine_grained_shuffle_stream_count() : 0; - pushBack(PhysicalMockExchangeReceiver::build(context, executor_id, log, executor->exchange_receiver(), fine_grained_stream_count)); + pushBack(PhysicalMockExchangeReceiver::build(context, executor_id, log, executor->exchange_receiver(), FineGrainedShuffle(executor))); } else { // for MPP test, we can use real exchangeReceiver to run an query across different compute nodes // or use one compute node to simulate MPP process. - pushBack(PhysicalExchangeReceiver::build(context, executor_id, log)); + pushBack(PhysicalExchangeReceiver::build(context, executor_id, log, FineGrainedShuffle(executor))); } break; } diff --git a/dbms/src/Flash/Planner/PhysicalPlanNode.cpp b/dbms/src/Flash/Planner/PhysicalPlanNode.cpp index cf200be1d6b..7c8c5668207 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanNode.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlanNode.cpp @@ -28,10 +28,12 @@ PhysicalPlanNode::PhysicalPlanNode( const String & executor_id_, const PlanType & type_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id) : executor_id(executor_id_) , type(type_) , schema(schema_) + , fine_grained_shuffle(fine_grained_shuffle_) , log(Logger::get(req_id, type_.toString(), executor_id_)) {} @@ -88,7 +90,11 @@ void PhysicalPlanNode::buildBlockInputStream(DAGPipeline & pipeline, Context & c } } -void PhysicalPlanNode::buildPipelineExec(PipelineExecGroupBuilder & /*group_builder*/, Context & /*context*/, size_t /*concurrency*/) +void PhysicalPlanNode::buildPipelineExecGroup( + PipelineExecutorStatus & /*exec_status*/, + PipelineExecGroupBuilder & /*group_builder*/, + Context & /*context*/, + size_t /*concurrency*/) { throw Exception("Unsupport"); } diff --git a/dbms/src/Flash/Planner/PhysicalPlanNode.h b/dbms/src/Flash/Planner/PhysicalPlanNode.h index 4d903c2ba59..d25d978be6b 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanNode.h +++ b/dbms/src/Flash/Planner/PhysicalPlanNode.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -28,6 +29,8 @@ struct DAGPipeline; class Context; class DAGContext; +class PipelineExecutorStatus; + struct PipelineExecGroupBuilder; class Pipeline; @@ -44,6 +47,7 @@ class PhysicalPlanNode : public std::enable_shared_from_this const String & executor_id_, const PlanType & type_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id); virtual ~PhysicalPlanNode() = default; @@ -60,7 +64,11 @@ class PhysicalPlanNode : public std::enable_shared_from_this virtual void buildBlockInputStream(DAGPipeline & pipeline, Context & context, size_t max_streams); - virtual void buildPipelineExec(PipelineExecGroupBuilder & /*group_builder*/, Context & /*context*/, size_t /*concurrency*/); + virtual void buildPipelineExecGroup( + PipelineExecutorStatus & /*exec_status*/, + PipelineExecGroupBuilder & /*group_builder*/, + Context & /*context*/, + size_t /*concurrency*/); virtual void buildPipeline(PipelineBuilder & builder); @@ -76,6 +84,8 @@ class PhysicalPlanNode : public std::enable_shared_from_this void disableRestoreConcurrency() { is_restore_concurrency = false; } + const FineGrainedShuffle & getFineGrainedShuffle() const { return fine_grained_shuffle; } + String toString(); String toSimpleString(); @@ -89,6 +99,11 @@ class PhysicalPlanNode : public std::enable_shared_from_this PlanType type; NamesAndTypes schema; + // Most operators are not aware of whether they are fine-grained shuffles or not. + // Whether they are fine-grained shuffles or not, their execution remains unchanged. + // Only a few operators need to sense fine-grained shuffle, such as exchange sender/receiver, join, aggregate, window and window sort. + FineGrainedShuffle fine_grained_shuffle; + bool is_tidb_operator = true; bool is_restore_concurrency = true; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp b/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp index 0aa517c76a3..0f35a2fa6bd 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp @@ -76,6 +76,7 @@ PhysicalPlanNodePtr PhysicalAggregation::build( auto physical_agg = std::make_shared( executor_id, schema, + fine_grained_shuffle, log->identifier(), child, before_agg_actions, @@ -83,8 +84,7 @@ PhysicalPlanNodePtr PhysicalAggregation::build( collators, AggregationInterpreterHelper::isFinalAgg(aggregation), aggregate_descriptions, - expr_after_agg_actions, - fine_grained_shuffle); + expr_after_agg_actions); return physical_agg; } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregation.h b/dbms/src/Flash/Planner/Plans/PhysicalAggregation.h index b97492e286f..81ae6005f4c 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalAggregation.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregation.h @@ -35,6 +35,7 @@ class PhysicalAggregation : public PhysicalUnary PhysicalAggregation( const String & executor_id_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id, const PhysicalPlanNodePtr & child_, const ExpressionActionsPtr & before_agg_actions_, @@ -42,16 +43,14 @@ class PhysicalAggregation : public PhysicalUnary const TiDB::TiDBCollators & aggregation_collators_, bool is_final_agg_, const AggregateDescriptions & aggregate_descriptions_, - const ExpressionActionsPtr & expr_after_agg_, - const FineGrainedShuffle & fine_grained_shuffle_) - : PhysicalUnary(executor_id_, PlanType::Aggregation, schema_, req_id, child_) + const ExpressionActionsPtr & expr_after_agg_) + : PhysicalUnary(executor_id_, PlanType::Aggregation, schema_, fine_grained_shuffle_, req_id, child_) , before_agg_actions(before_agg_actions_) , aggregation_keys(aggregation_keys_) , aggregation_collators(aggregation_collators_) , is_final_agg(is_final_agg_) , aggregate_descriptions(aggregate_descriptions_) , expr_after_agg(expr_after_agg_) - , fine_grained_shuffle(fine_grained_shuffle_) {} void buildPipeline(PipelineBuilder & builder) override; @@ -63,12 +62,12 @@ class PhysicalAggregation : public PhysicalUnary private: void buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; +private: ExpressionActionsPtr before_agg_actions; Names aggregation_keys; TiDB::TiDBCollators aggregation_collators; bool is_final_agg; AggregateDescriptions aggregate_descriptions; ExpressionActionsPtr expr_after_agg; - FineGrainedShuffle fine_grained_shuffle; }; } // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp b/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp index c172950ad9c..4d5c4d3faee 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp @@ -20,18 +20,22 @@ namespace DB { -void PhysicalAggregationBuild::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & context, size_t /*concurrency*/) +void PhysicalAggregationBuild::buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & context, + size_t /*concurrency*/) { if (!before_agg_actions->getActions().empty()) { group_builder.transform([&](auto & builder) { - builder.appendTransformOp(std::make_unique(group_builder.exec_status, log->identifier(), before_agg_actions)); + builder.appendTransformOp(std::make_unique(exec_status, log->identifier(), before_agg_actions)); }); } size_t build_index = 0; group_builder.transform([&](auto & builder) { - builder.setSinkOp(std::make_unique(group_builder.exec_status, build_index++, aggregate_context, log->identifier())); + builder.setSinkOp(std::make_unique(exec_status, build_index++, aggregate_context, log->identifier())); }); Block before_agg_header = group_builder.getCurrentHeader(); diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.h b/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.h index f9f9794799c..8a94a8df880 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.h @@ -38,7 +38,7 @@ class PhysicalAggregationBuild : public PhysicalUnary bool is_final_agg_, const AggregateDescriptions & aggregate_descriptions_, const AggregateContextPtr & aggregate_context_) - : PhysicalUnary(executor_id_, PlanType::AggregationBuild, schema_, req_id, child_) + : PhysicalUnary(executor_id_, PlanType::AggregationBuild, schema_, FineGrainedShuffle{}, req_id, child_) , before_agg_actions(before_agg_actions_) , aggregation_keys(aggregation_keys_) , aggregation_collators(aggregation_collators_) @@ -47,7 +47,11 @@ class PhysicalAggregationBuild : public PhysicalUnary , aggregate_context(aggregate_context_) {} - void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & context, size_t /*concurrency*/) override; + void buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & context, + size_t /*concurrency*/) override; private: DISABLE_USELESS_FUNCTION_FOR_BREAKER diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.cpp b/dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.cpp index 92221e90011..74989b4c9a8 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.cpp @@ -19,7 +19,11 @@ namespace DB { -void PhysicalAggregationConvergent::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) +void PhysicalAggregationConvergent::buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & /*context*/, + size_t /*concurrency*/) { aggregate_context->initConvergent(); @@ -28,7 +32,7 @@ void PhysicalAggregationConvergent::buildPipelineExec(PipelineExecGroupBuilder & group_builder.init(1); group_builder.transform([&](auto & builder) { builder.setSourceOp(std::make_unique( - group_builder.exec_status, + exec_status, aggregate_context->getHeader(), log->identifier())); }); @@ -39,7 +43,7 @@ void PhysicalAggregationConvergent::buildPipelineExec(PipelineExecGroupBuilder & size_t index = 0; group_builder.transform([&](auto & builder) { builder.setSourceOp(std::make_unique( - group_builder.exec_status, + exec_status, aggregate_context, index++, log->identifier())); @@ -49,7 +53,7 @@ void PhysicalAggregationConvergent::buildPipelineExec(PipelineExecGroupBuilder & if (!expr_after_agg->getActions().empty()) { group_builder.transform([&](auto & builder) { - builder.appendTransformOp(std::make_unique(group_builder.exec_status, log->identifier(), expr_after_agg)); + builder.appendTransformOp(std::make_unique(exec_status, log->identifier(), expr_after_agg)); }); } } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.h b/dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.h index 7895986b6cb..9de698d0170 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.h @@ -31,12 +31,16 @@ class PhysicalAggregationConvergent : public PhysicalLeaf const String & req_id, const AggregateContextPtr & aggregate_context_, const ExpressionActionsPtr & expr_after_agg_) - : PhysicalLeaf(executor_id_, PlanType::AggregationConvergent, schema_, req_id) + : PhysicalLeaf(executor_id_, PlanType::AggregationConvergent, schema_, FineGrainedShuffle{}, req_id) , expr_after_agg(expr_after_agg_) , aggregate_context(aggregate_context_) {} - void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t concurrency) override; + void buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & /*context*/, + size_t concurrency) override; private: DISABLE_USELESS_FUNCTION_FOR_BREAKER diff --git a/dbms/src/Flash/Planner/Plans/PhysicalBinary.h b/dbms/src/Flash/Planner/Plans/PhysicalBinary.h index 22300e2f087..230489d3a50 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalBinary.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalBinary.h @@ -30,10 +30,11 @@ class PhysicalBinary : public PhysicalPlanNode const String & executor_id_, const PlanType & type_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id, const PhysicalPlanNodePtr & left_, const PhysicalPlanNodePtr & right_) - : PhysicalPlanNode(executor_id_, type_, schema_, req_id) + : PhysicalPlanNode(executor_id_, type_, schema_, fine_grained_shuffle_, req_id) , left(left_) , right(right_) { diff --git a/dbms/src/Flash/Planner/Plans/PhysicalExchangeReceiver.cpp b/dbms/src/Flash/Planner/Plans/PhysicalExchangeReceiver.cpp index 9a74346cb98..63dbffac6fc 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalExchangeReceiver.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalExchangeReceiver.cpp @@ -31,10 +31,11 @@ namespace DB PhysicalExchangeReceiver::PhysicalExchangeReceiver( const String & executor_id_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle, const String & req_id, const Block & sample_block_, const std::shared_ptr & mpp_exchange_receiver_) - : PhysicalLeaf(executor_id_, PlanType::ExchangeReceiver, schema_, req_id) + : PhysicalLeaf(executor_id_, PlanType::ExchangeReceiver, schema_, fine_grained_shuffle, req_id) , sample_block(sample_block_) , mpp_exchange_receiver(mpp_exchange_receiver_) {} @@ -42,7 +43,8 @@ PhysicalExchangeReceiver::PhysicalExchangeReceiver( PhysicalPlanNodePtr PhysicalExchangeReceiver::build( const Context & context, const String & executor_id, - const LoggerPtr & log) + const LoggerPtr & log, + const FineGrainedShuffle & fine_grained_shuffle) { auto mpp_exchange_receiver = context.getDAGContext()->getMPPExchangeReceiver(executor_id); if (unlikely(mpp_exchange_receiver == nullptr)) @@ -54,6 +56,7 @@ PhysicalPlanNodePtr PhysicalExchangeReceiver::build( auto physical_exchange_receiver = std::make_shared( executor_id, schema, + fine_grained_shuffle, log->identifier(), Block(schema), mpp_exchange_receiver); @@ -68,13 +71,12 @@ void PhysicalExchangeReceiver::buildBlockInputStreamImpl(DAGPipeline & pipeline, // todo choose a more reasonable stream number auto & exchange_receiver_io_input_streams = dag_context.getInBoundIOInputStreamsMap()[executor_id]; - const bool enable_fine_grained_shuffle = enableFineGrainedShuffle(mpp_exchange_receiver->getFineGrainedShuffleStreamCount()); String extra_info = "squashing after exchange receiver"; size_t stream_count = max_streams; - if (enable_fine_grained_shuffle) + if (fine_grained_shuffle.enable()) { extra_info += ", " + String(enableFineGrainedShuffleExtraInfo); - stream_count = std::min(max_streams, mpp_exchange_receiver->getFineGrainedShuffleStreamCount()); + stream_count = std::min(max_streams, fine_grained_shuffle.stream_count); } for (size_t i = 0; i < stream_count; ++i) @@ -83,27 +85,30 @@ void PhysicalExchangeReceiver::buildBlockInputStreamImpl(DAGPipeline & pipeline, mpp_exchange_receiver, log->identifier(), execId(), - /*stream_id=*/enable_fine_grained_shuffle ? i : 0); + /*stream_id=*/fine_grained_shuffle.enable() ? i : 0); exchange_receiver_io_input_streams.push_back(stream); stream->setExtraInfo(extra_info); pipeline.streams.push_back(stream); } } -void PhysicalExchangeReceiver::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t concurrency) +void PhysicalExchangeReceiver::buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & /*context*/, + size_t concurrency) { - // TODO support fine grained shuffle. - const bool enable_fine_grained_shuffle = enableFineGrainedShuffle(mpp_exchange_receiver->getFineGrainedShuffleStreamCount()); - RUNTIME_CHECK(!enable_fine_grained_shuffle); + if (fine_grained_shuffle.enable()) + concurrency = std::min(concurrency, fine_grained_shuffle.stream_count); - // TODO choose a more reasonable concurrency. group_builder.init(concurrency); + size_t partition_id = 0; group_builder.transform([&](auto & builder) { builder.setSourceOp(std::make_unique( - group_builder.exec_status, + exec_status, log->identifier(), mpp_exchange_receiver, - /*stream_id=*/0)); + /*stream_id=*/fine_grained_shuffle.enable() ? partition_id++ : 0)); }); } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalExchangeReceiver.h b/dbms/src/Flash/Planner/Plans/PhysicalExchangeReceiver.h index f73084d4244..9150900e8f1 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalExchangeReceiver.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalExchangeReceiver.h @@ -27,11 +27,13 @@ class PhysicalExchangeReceiver : public PhysicalLeaf static PhysicalPlanNodePtr build( const Context & context, const String & executor_id, - const LoggerPtr & log); + const LoggerPtr & log, + const FineGrainedShuffle & fine_grained_shuffle); PhysicalExchangeReceiver( const String & executor_id_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle, const String & req_id, const Block & sample_block_, const std::shared_ptr & mpp_exchange_receiver_); @@ -45,7 +47,11 @@ class PhysicalExchangeReceiver : public PhysicalLeaf return mpp_exchange_receiver->getSourceNum(); } - void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) override; + void buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & /*context*/, + size_t /*concurrency*/) override; private: void buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.cpp b/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.cpp index 0e161469fba..75e3268b0d3 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.cpp @@ -41,12 +41,12 @@ PhysicalPlanNodePtr PhysicalExchangeSender::build( auto physical_exchange_sender = std::make_shared( executor_id, child->getSchema(), + fine_grained_shuffle, log->identifier(), child, partition_col_ids, partition_col_collators, exchange_sender.tp(), - fine_grained_shuffle, exchange_sender.compression()); // executeUnion will be call after sender.transform, so don't need to restore concurrency. physical_exchange_sender->disableRestoreConcurrency(); @@ -65,7 +65,7 @@ void PhysicalExchangeSender::buildBlockInputStreamImpl(DAGPipeline & pipeline, C { extra_info = String(enableFineGrainedShuffleExtraInfo); RUNTIME_CHECK(exchange_type == tipb::ExchangeType::Hash, ExchangeType_Name(exchange_type)); - RUNTIME_CHECK(fine_grained_shuffle.stream_count <= 1024, fine_grained_shuffle.stream_count); + RUNTIME_CHECK(fine_grained_shuffle.stream_count <= maxFineGrainedStreamCount, fine_grained_shuffle.stream_count); } pipeline.transform([&](auto & stream) { // construct writer @@ -87,10 +87,17 @@ void PhysicalExchangeSender::buildBlockInputStreamImpl(DAGPipeline & pipeline, C }); } -void PhysicalExchangeSender::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & context, size_t /*concurrency*/) +void PhysicalExchangeSender::buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & context, + size_t /*concurrency*/) { - // TODO support fine grained shuffle - RUNTIME_CHECK(!fine_grained_shuffle.enable()); + if (fine_grained_shuffle.enable()) + { + RUNTIME_CHECK(exchange_type == tipb::ExchangeType::Hash, ExchangeType_Name(exchange_type)); + RUNTIME_CHECK(fine_grained_shuffle.stream_count <= maxFineGrainedStreamCount, fine_grained_shuffle.stream_count); + } group_builder.transform([&](auto & builder) { // construct writer @@ -108,7 +115,7 @@ void PhysicalExchangeSender::buildPipelineExec(PipelineExecGroupBuilder & group_ context.getSettingsRef().batch_send_min_limit_compression, log->identifier(), /*is_async=*/true); - builder.setSinkOp(std::make_unique(group_builder.exec_status, log->identifier(), std::move(response_writer))); + builder.setSinkOp(std::make_unique(exec_status, log->identifier(), std::move(response_writer))); }); } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.h b/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.h index 121238cc6eb..6f6e5460cc3 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.h @@ -34,26 +34,29 @@ class PhysicalExchangeSender : public PhysicalUnary PhysicalExchangeSender( const String & executor_id_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id, const PhysicalPlanNodePtr & child_, const std::vector & partition_col_ids_, const TiDB::TiDBCollators & collators_, const tipb::ExchangeType & exchange_type_, - const FineGrainedShuffle & fine_grained_shuffle_, const tipb::CompressionMode & compression_mode_) - : PhysicalUnary(executor_id_, PlanType::ExchangeSender, schema_, req_id, child_) + : PhysicalUnary(executor_id_, PlanType::ExchangeSender, schema_, fine_grained_shuffle_, req_id, child_) , partition_col_ids(partition_col_ids_) , partition_col_collators(collators_) , exchange_type(exchange_type_) , compression_mode(compression_mode_) - , fine_grained_shuffle(fine_grained_shuffle_) {} void finalize(const Names & parent_require) override; const Block & getSampleBlock() const override; - void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & context, size_t /*concurrency*/) override; + void buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & context, + size_t /*concurrency*/) override; private: void buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; @@ -62,7 +65,5 @@ class PhysicalExchangeSender : public PhysicalUnary TiDB::TiDBCollators partition_col_collators; tipb::ExchangeType exchange_type; tipb::CompressionMode compression_mode; - - FineGrainedShuffle fine_grained_shuffle; }; } // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalExpand.cpp b/dbms/src/Flash/Planner/Plans/PhysicalExpand.cpp index fdba8dd1b8a..5f8574eee4d 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalExpand.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalExpand.cpp @@ -64,11 +64,11 @@ PhysicalPlanNodePtr PhysicalExpand::build( auto physical_expand = std::make_shared( executor_id, expand_output_columns, + child->getFineGrainedShuffle(), log->identifier(), child, expand_action.expand, before_expand_actions); - return physical_expand; } @@ -82,10 +82,14 @@ void PhysicalExpand::expandTransform(DAGPipeline & child_pipeline) }); } -void PhysicalExpand::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context &, size_t) +void PhysicalExpand::buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & /*context*/, + size_t /*concurrency*/) { group_builder.transform([&](auto & builder) { - builder.appendTransformOp(std::make_unique(group_builder.exec_status, log->identifier(), expand_actions)); + builder.appendTransformOp(std::make_unique(exec_status, log->identifier(), expand_actions)); }); } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalExpand.h b/dbms/src/Flash/Planner/Plans/PhysicalExpand.h index fa668b64114..5289b816e4e 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalExpand.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalExpand.h @@ -34,11 +34,12 @@ class PhysicalExpand : public PhysicalUnary PhysicalExpand( const String & executor_id_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id, const PhysicalPlanNodePtr & child_, const std::shared_ptr & shared_expand, const ExpressionActionsPtr & expand_actions) - : PhysicalUnary(executor_id_, PlanType::Expand, schema_, req_id, child_) + : PhysicalUnary(executor_id_, PlanType::Expand, schema_, fine_grained_shuffle_, req_id, child_) , shared_expand(shared_expand) , expand_actions(expand_actions) {} @@ -49,7 +50,11 @@ class PhysicalExpand : public PhysicalUnary const Block & getSampleBlock() const override; - void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) override; + void buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & /*context*/, + size_t /*concurrency*/) override; private: void buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalFilter.cpp b/dbms/src/Flash/Planner/Plans/PhysicalFilter.cpp index a3d353a60a1..1aa8f106dce 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalFilter.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalFilter.cpp @@ -42,6 +42,7 @@ PhysicalPlanNodePtr PhysicalFilter::build( auto physical_filter = std::make_shared( executor_id, child->getSchema(), + child->getFineGrainedShuffle(), log->identifier(), child, filter_column_name, @@ -57,11 +58,15 @@ void PhysicalFilter::buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, before_filter_actions, filter_column, log->identifier()); }); } -void PhysicalFilter::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) +void PhysicalFilter::buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & /*context*/, + size_t /*concurrency*/) { auto input_header = group_builder.getCurrentHeader(); group_builder.transform([&](auto & builder) { - builder.appendTransformOp(std::make_unique(group_builder.exec_status, log->identifier(), input_header, before_filter_actions, filter_column)); + builder.appendTransformOp(std::make_unique(exec_status, log->identifier(), input_header, before_filter_actions, filter_column)); }); } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalFilter.h b/dbms/src/Flash/Planner/Plans/PhysicalFilter.h index 08f420530e2..f22a57b456e 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalFilter.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalFilter.h @@ -33,11 +33,12 @@ class PhysicalFilter : public PhysicalUnary PhysicalFilter( const String & executor_id_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id, const PhysicalPlanNodePtr & child_, const String & filter_column_, const ExpressionActionsPtr & before_filter_actions_) - : PhysicalUnary(executor_id_, PlanType::Filter, schema_, req_id, child_) + : PhysicalUnary(executor_id_, PlanType::Filter, schema_, fine_grained_shuffle_, req_id, child_) , filter_column(filter_column_) , before_filter_actions(before_filter_actions_) {} @@ -46,7 +47,11 @@ class PhysicalFilter : public PhysicalUnary const Block & getSampleBlock() const override; - void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) override; + void buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & /*context*/, + size_t /*concurrency*/) override; private: void buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.cpp b/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.cpp index e1ac74f8b32..d7a81162a01 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.cpp @@ -20,16 +20,27 @@ namespace DB { PhysicalPlanNodePtr PhysicalGetResultSink::build( ResultHandler && result_handler, + const LoggerPtr & log, const PhysicalPlanNodePtr & child) { - return std::make_shared("get_result_sink", child->getSchema(), "", child, std::move(result_handler)); + return std::make_shared( + "get_result_sink", + child->getSchema(), + child->getFineGrainedShuffle(), + log->identifier(), + child, + std::move(result_handler)); } -void PhysicalGetResultSink::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) +void PhysicalGetResultSink::buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & /*context*/, + size_t /*concurrency*/) { auto this_shared_ptr = std::static_pointer_cast(shared_from_this()); group_builder.transform([&](auto & builder) { - builder.setSinkOp(std::make_unique(group_builder.exec_status, log->identifier(), this_shared_ptr)); + builder.setSinkOp(std::make_unique(exec_status, log->identifier(), this_shared_ptr)); }); } } // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.h b/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.h index 5d3d42df298..5e17d2ce269 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.h @@ -26,15 +26,17 @@ class PhysicalGetResultSink : public PhysicalUnary public: static PhysicalPlanNodePtr build( ResultHandler && result_handler, + const LoggerPtr & log, const PhysicalPlanNodePtr & child); PhysicalGetResultSink( const String & executor_id_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id, const PhysicalPlanNodePtr & child_, ResultHandler && result_handler_) - : PhysicalUnary(executor_id_, PlanType::GetResult, schema_, req_id, child_) + : PhysicalUnary(executor_id_, PlanType::GetResult, schema_, fine_grained_shuffle_, req_id, child_) , result_handler(std::move(result_handler_)) { assert(!result_handler.isIgnored()); @@ -50,7 +52,11 @@ class PhysicalGetResultSink : public PhysicalUnary throw Exception("Unsupport"); } - void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) override; + void buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & /*context*/, + size_t /*concurrency*/) override; private: friend class GetResultSinkOp; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp index c94c43e0656..88f7850abc3 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp @@ -149,14 +149,14 @@ PhysicalPlanNodePtr PhysicalJoin::build( auto physical_join = std::make_shared( executor_id, join_output_schema, + fine_grained_shuffle, log->identifier(), probe_plan, build_plan, join_ptr, probe_side_prepare_actions, build_side_prepare_actions, - Block(join_output_schema), - fine_grained_shuffle); + Block(join_output_schema)); return physical_join; } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalJoin.h b/dbms/src/Flash/Planner/Plans/PhysicalJoin.h index 4a1f29fb5e8..e1fc5351c7a 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalJoin.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalJoin.h @@ -36,20 +36,19 @@ class PhysicalJoin : public PhysicalBinary PhysicalJoin( const String & executor_id_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id, const PhysicalPlanNodePtr & probe_, const PhysicalPlanNodePtr & build_, const JoinPtr & join_ptr_, const ExpressionActionsPtr & probe_side_prepare_actions_, const ExpressionActionsPtr & build_side_prepare_actions_, - const Block & sample_block_, - const FineGrainedShuffle & fine_grained_shuffle_) - : PhysicalBinary(executor_id_, PlanType::Join, schema_, req_id, probe_, build_) + const Block & sample_block_) + : PhysicalBinary(executor_id_, PlanType::Join, schema_, fine_grained_shuffle_, req_id, probe_, build_) , join_ptr(join_ptr_) , probe_side_prepare_actions(probe_side_prepare_actions_) , build_side_prepare_actions(build_side_prepare_actions_) , sample_block(sample_block_) - , fine_grained_shuffle(fine_grained_shuffle_) {} void buildPipeline(PipelineBuilder & builder) override; @@ -78,6 +77,5 @@ class PhysicalJoin : public PhysicalBinary ExpressionActionsPtr build_side_prepare_actions; Block sample_block; - FineGrainedShuffle fine_grained_shuffle; }; } // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalLeaf.h b/dbms/src/Flash/Planner/Plans/PhysicalLeaf.h index b2e0ded27d8..ffcb8c47c46 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalLeaf.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalLeaf.h @@ -29,8 +29,9 @@ class PhysicalLeaf : public PhysicalPlanNode const String & executor_id_, const PlanType & type_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id) - : PhysicalPlanNode(executor_id_, type_, schema_, req_id) + : PhysicalPlanNode(executor_id_, type_, schema_, fine_grained_shuffle_, req_id) {} PhysicalPlanNodePtr children(size_t) const override diff --git a/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp b/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp index 5144ad6c11d..746a7876564 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp @@ -35,6 +35,7 @@ PhysicalPlanNodePtr PhysicalLimit::build( auto physical_limit = std::make_shared( executor_id, child->getSchema(), + child->getFineGrainedShuffle(), log->identifier(), child, limit.limit()); @@ -53,12 +54,16 @@ void PhysicalLimit::buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & } } -void PhysicalLimit::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) +void PhysicalLimit::buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & /*context*/, + size_t /*concurrency*/) { auto input_header = group_builder.getCurrentHeader(); auto global_limit = std::make_shared(input_header, limit); group_builder.transform([&](auto & builder) { - builder.appendTransformOp(std::make_unique(group_builder.exec_status, log->identifier(), global_limit)); + builder.appendTransformOp(std::make_unique(exec_status, log->identifier(), global_limit)); }); } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalLimit.h b/dbms/src/Flash/Planner/Plans/PhysicalLimit.h index b8e026a6b49..7589b564bb8 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalLimit.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalLimit.h @@ -31,10 +31,11 @@ class PhysicalLimit : public PhysicalUnary PhysicalLimit( const String & executor_id_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id, const PhysicalPlanNodePtr & child_, size_t limit_) - : PhysicalUnary(executor_id_, PlanType::Limit, schema_, req_id, child_) + : PhysicalUnary(executor_id_, PlanType::Limit, schema_, fine_grained_shuffle_, req_id, child_) , limit(limit_) {} @@ -42,7 +43,11 @@ class PhysicalLimit : public PhysicalUnary const Block & getSampleBlock() const override; - void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) override; + void buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & /*context*/, + size_t /*concurrency*/) override; private: void buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeReceiver.cpp b/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeReceiver.cpp index 7d5dca38f14..d3d2efc11e0 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeReceiver.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeReceiver.cpp @@ -28,11 +28,12 @@ namespace DB PhysicalMockExchangeReceiver::PhysicalMockExchangeReceiver( const String & executor_id_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id, const Block & sample_block_, const BlockInputStreams & mock_streams_, size_t source_num_) - : PhysicalLeaf(executor_id_, PlanType::MockExchangeReceiver, schema_, req_id) + : PhysicalLeaf(executor_id_, PlanType::MockExchangeReceiver, schema_, fine_grained_shuffle_, req_id) , sample_block(sample_block_) , mock_streams(mock_streams_) , source_num(source_num_) @@ -43,13 +44,14 @@ PhysicalPlanNodePtr PhysicalMockExchangeReceiver::build( const String & executor_id, const LoggerPtr & log, const tipb::ExchangeReceiver & exchange_receiver, - size_t fine_grained_stream_count) + const FineGrainedShuffle & fine_grained_shuffle) { - auto [schema, mock_streams] = mockSchemaAndStreamsForExchangeReceiver(context, executor_id, log, exchange_receiver, fine_grained_stream_count); + auto [schema, mock_streams] = mockSchemaAndStreamsForExchangeReceiver(context, executor_id, log, exchange_receiver, fine_grained_shuffle.stream_count); auto physical_mock_exchange_receiver = std::make_shared( executor_id, schema, + fine_grained_shuffle, log->identifier(), Block(schema), mock_streams, @@ -63,12 +65,16 @@ void PhysicalMockExchangeReceiver::buildBlockInputStreamImpl(DAGPipeline & pipel pipeline.streams.insert(pipeline.streams.end(), mock_streams.begin(), mock_streams.end()); } -void PhysicalMockExchangeReceiver::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) +void PhysicalMockExchangeReceiver::buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & /*context*/, + size_t /*concurrency*/) { group_builder.init(mock_streams.size()); size_t i = 0; group_builder.transform([&](auto & builder) { - builder.setSourceOp(std::make_unique(group_builder.exec_status, log->identifier(), mock_streams[i++])); + builder.setSourceOp(std::make_unique(exec_status, log->identifier(), mock_streams[i++])); }); } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeReceiver.h b/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeReceiver.h index 26b2c72caed..ac3bf4561ff 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeReceiver.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeReceiver.h @@ -33,11 +33,12 @@ class PhysicalMockExchangeReceiver : public PhysicalLeaf const String & executor_id, const LoggerPtr & log, const tipb::ExchangeReceiver & exchange_receiver, - size_t fine_grained_stream_count); + const FineGrainedShuffle & fine_grained_shuffle); PhysicalMockExchangeReceiver( const String & executor_id_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id, const Block & sample_block_, const BlockInputStreams & mock_streams, @@ -49,7 +50,11 @@ class PhysicalMockExchangeReceiver : public PhysicalLeaf size_t getSourceNum() const { return source_num; }; - void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) override; + void buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & /*context*/, + size_t /*concurrency*/) override; private: void buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & /*context*/, size_t /*max_streams*/) override; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeSender.cpp b/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeSender.cpp index 490ec164ae6..78b96d2c5d4 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeSender.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeSender.cpp @@ -30,6 +30,7 @@ PhysicalPlanNodePtr PhysicalMockExchangeSender::build( auto physical_mock_exchange_sender = std::make_shared( executor_id, child->getSchema(), + FineGrainedShuffle{}, log->identifier(), child); // executeUnion will be call after sender.transform, so don't need to restore concurrency. diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeSender.h b/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeSender.h index 445222d094d..cc5a84c8a2c 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeSender.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockExchangeSender.h @@ -31,16 +31,22 @@ class PhysicalMockExchangeSender : public PhysicalUnary PhysicalMockExchangeSender( const String & executor_id_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id, const PhysicalPlanNodePtr & child_) - : PhysicalUnary(executor_id_, PlanType::MockExchangeSender, schema_, req_id, child_) + : PhysicalUnary(executor_id_, PlanType::MockExchangeSender, schema_, fine_grained_shuffle_, req_id, child_) {} void finalize(const Names & parent_require) override; const Block & getSampleBlock() const override; - void buildPipelineExec(PipelineExecGroupBuilder & /*group_builder*/, Context & /*context*/, size_t /*concurrency*/) override {} + void buildPipelineExecGroup( + PipelineExecutorStatus & /*exec_status*/, + PipelineExecGroupBuilder & /*group_builder*/, + Context & /*context*/, + size_t /*concurrency*/) override + {} private: void buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp index 7a7798d7501..a921d7729d5 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp @@ -80,7 +80,7 @@ PhysicalMockTableScan::PhysicalMockTableScan( const Block & sample_block_, const BlockInputStreams & mock_streams_, Int64 table_id_) - : PhysicalLeaf(executor_id_, PlanType::MockTableScan, schema_, req_id) + : PhysicalLeaf(executor_id_, PlanType::MockTableScan, schema_, FineGrainedShuffle{}, req_id) , sample_block(sample_block_) , mock_streams(mock_streams_) , table_id(table_id_) @@ -111,11 +111,15 @@ void PhysicalMockTableScan::buildBlockInputStreamImpl(DAGPipeline & pipeline, Co pipeline.streams.insert(pipeline.streams.end(), mock_streams.begin(), mock_streams.end()); } -void PhysicalMockTableScan::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & context, size_t concurrency) +void PhysicalMockTableScan::buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & context, + size_t concurrency) { if (context.mockStorage()->useDeltaMerge()) { - auto source_ops = context.mockStorage()->getSourceOpsFromDeltaMerge(group_builder.exec_status, context, table_id, concurrency); + auto source_ops = context.mockStorage()->getSourceOpsFromDeltaMerge(exec_status, context, table_id, concurrency); group_builder.init(source_ops.size()); size_t i = 0; group_builder.transform([&](auto & builder) { @@ -127,7 +131,7 @@ void PhysicalMockTableScan::buildPipelineExec(PipelineExecGroupBuilder & group_b group_builder.init(mock_streams.size()); size_t i = 0; group_builder.transform([&](auto & builder) { - builder.setSourceOp(std::make_unique(group_builder.exec_status, log->identifier(), mock_streams[i++])); + builder.setSourceOp(std::make_unique(exec_status, log->identifier(), mock_streams[i++])); }); } } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h index 7330b5d16c4..98dd0409ab4 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h @@ -48,7 +48,11 @@ class PhysicalMockTableScan : public PhysicalLeaf const Block & getSampleBlock() const override; - void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) override; + void buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & context, + size_t concurrency) override; void initStreams(Context & context); diff --git a/dbms/src/Flash/Planner/Plans/PhysicalProjection.cpp b/dbms/src/Flash/Planner/Plans/PhysicalProjection.cpp index 9eea80a077c..c175fb00ac7 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalProjection.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalProjection.cpp @@ -50,6 +50,7 @@ PhysicalPlanNodePtr PhysicalProjection::build( auto physical_projection = std::make_shared( executor_id, schema, + child->getFineGrainedShuffle(), log->identifier(), child, "projection", @@ -82,6 +83,7 @@ PhysicalPlanNodePtr PhysicalProjection::buildNonRootFinal( auto physical_projection = std::make_shared( child->execId(), schema, + child->getFineGrainedShuffle(), log->identifier(), child, "final projection", @@ -127,6 +129,7 @@ PhysicalPlanNodePtr PhysicalProjection::buildRootFinal( auto physical_projection = std::make_shared( child->execId(), schema, + child->getFineGrainedShuffle(), log->identifier(), child, "final projection", @@ -143,12 +146,16 @@ void PhysicalProjection::buildBlockInputStreamImpl(DAGPipeline & pipeline, Conte executeExpression(pipeline, project_actions, log, extra_info); } -void PhysicalProjection::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) +void PhysicalProjection::buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & /*context*/, + size_t /*concurrency*/) { if (project_actions && !project_actions->getActions().empty()) { group_builder.transform([&](auto & builder) { - builder.appendTransformOp(std::make_unique(group_builder.exec_status, log->identifier(), project_actions)); + builder.appendTransformOp(std::make_unique(exec_status, log->identifier(), project_actions)); }); } } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalProjection.h b/dbms/src/Flash/Planner/Plans/PhysicalProjection.h index ab1e1936a86..c0f22712516 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalProjection.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalProjection.h @@ -53,11 +53,12 @@ class PhysicalProjection : public PhysicalUnary PhysicalProjection( const String & executor_id_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id, const PhysicalPlanNodePtr & child_, const String & extra_info_, const ExpressionActionsPtr & project_actions_) - : PhysicalUnary(executor_id_, PlanType::Projection, schema_, req_id, child_) + : PhysicalUnary(executor_id_, PlanType::Projection, schema_, fine_grained_shuffle_, req_id, child_) , extra_info(extra_info_) , project_actions(project_actions_) {} @@ -66,7 +67,11 @@ class PhysicalProjection : public PhysicalUnary const Block & getSampleBlock() const override; - void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) override; + void buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & /*context*/, + size_t /*concurrency*/) override; private: void buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalTableScan.cpp b/dbms/src/Flash/Planner/Plans/PhysicalTableScan.cpp index cafae6375a5..a9726054501 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalTableScan.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalTableScan.cpp @@ -32,7 +32,7 @@ PhysicalTableScan::PhysicalTableScan( const String & req_id, const TiDBTableScan & tidb_table_scan_, const Block & sample_block_) - : PhysicalLeaf(executor_id_, PlanType::TableScan, schema_, req_id) + : PhysicalLeaf(executor_id_, PlanType::TableScan, schema_, FineGrainedShuffle{}, req_id) , tidb_table_scan(tidb_table_scan_) , sample_block(sample_block_) {} diff --git a/dbms/src/Flash/Planner/Plans/PhysicalTopN.cpp b/dbms/src/Flash/Planner/Plans/PhysicalTopN.cpp index 5fe779b6e46..f23b2d43e77 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalTopN.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalTopN.cpp @@ -51,6 +51,7 @@ PhysicalPlanNodePtr PhysicalTopN::build( auto physical_top_n = std::make_shared( executor_id, child->getSchema(), + child->getFineGrainedShuffle(), log->identifier(), child, order_descr, @@ -68,16 +69,20 @@ void PhysicalTopN::buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & c orderStreams(pipeline, max_streams, order_descr, limit, false, context, log); } -void PhysicalTopN::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & context, size_t /*concurrency*/) +void PhysicalTopN::buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & context, + size_t /*concurrency*/) { if (!before_sort_actions->getActions().empty()) { group_builder.transform([&](auto & builder) { - builder.appendTransformOp(std::make_unique(group_builder.exec_status, log->identifier(), before_sort_actions)); + builder.appendTransformOp(std::make_unique(exec_status, log->identifier(), before_sort_actions)); }); } group_builder.transform([&](auto & builder) { - builder.appendTransformOp(std::make_unique(group_builder.exec_status, log->identifier(), order_descr, limit, context.getSettingsRef().max_block_size)); + builder.appendTransformOp(std::make_unique(exec_status, log->identifier(), order_descr, limit, context.getSettingsRef().max_block_size)); }); } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalTopN.h b/dbms/src/Flash/Planner/Plans/PhysicalTopN.h index af1482cea7d..4964e1f8534 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalTopN.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalTopN.h @@ -34,12 +34,13 @@ class PhysicalTopN : public PhysicalUnary PhysicalTopN( const String & executor_id_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id, const PhysicalPlanNodePtr & child_, const SortDescription & order_descr_, const ExpressionActionsPtr & before_sort_actions_, size_t limit_) - : PhysicalUnary(executor_id_, PlanType::TopN, schema_, req_id, child_) + : PhysicalUnary(executor_id_, PlanType::TopN, schema_, fine_grained_shuffle_, req_id, child_) , order_descr(order_descr_) , before_sort_actions(before_sort_actions_) , limit(limit_) @@ -49,7 +50,11 @@ class PhysicalTopN : public PhysicalUnary const Block & getSampleBlock() const override; - void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & context, size_t concurrency) override; + void buildPipelineExecGroup( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + Context & context, + size_t concurrency) override; private: void buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalUnary.h b/dbms/src/Flash/Planner/Plans/PhysicalUnary.h index 47f32643be4..96e2551d63d 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalUnary.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalUnary.h @@ -31,9 +31,10 @@ class PhysicalUnary : public PhysicalPlanNode const String & executor_id_, const PlanType & type_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id, const PhysicalPlanNodePtr & child_) - : PhysicalPlanNode(executor_id_, type_, schema_, req_id) + : PhysicalPlanNode(executor_id_, type_, schema_, fine_grained_shuffle_, req_id) , child(child_) { RUNTIME_ASSERT(child, log, "children(0) shouldn't be nullptr"); diff --git a/dbms/src/Flash/Planner/Plans/PhysicalWindow.cpp b/dbms/src/Flash/Planner/Plans/PhysicalWindow.cpp index c6134574571..22bae4eb40a 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalWindow.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalWindow.cpp @@ -54,10 +54,10 @@ PhysicalPlanNodePtr PhysicalWindow::build( auto physical_window = std::make_shared( executor_id, schema, + fine_grained_shuffle, log->identifier(), child, - window_description, - fine_grained_shuffle); + window_description); return physical_window; } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalWindow.h b/dbms/src/Flash/Planner/Plans/PhysicalWindow.h index 05a7b5a6eca..e7b9314b36b 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalWindow.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalWindow.h @@ -35,13 +35,12 @@ class PhysicalWindow : public PhysicalUnary PhysicalWindow( const String & executor_id_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id, const PhysicalPlanNodePtr & child_, - const WindowDescription & window_description_, - const FineGrainedShuffle & fine_grained_shuffle_) - : PhysicalUnary(executor_id_, PlanType::Window, schema_, req_id, child_) + const WindowDescription & window_description_) + : PhysicalUnary(executor_id_, PlanType::Window, schema_, fine_grained_shuffle_, req_id, child_) , window_description(window_description_) - , fine_grained_shuffle(fine_grained_shuffle_) {} void finalize(const Names & parent_require) override; @@ -53,6 +52,5 @@ class PhysicalWindow : public PhysicalUnary private: WindowDescription window_description; - FineGrainedShuffle fine_grained_shuffle; }; } // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalWindowSort.cpp b/dbms/src/Flash/Planner/Plans/PhysicalWindowSort.cpp index a35578fa3e2..0a2445d67c6 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalWindowSort.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalWindowSort.cpp @@ -43,10 +43,10 @@ PhysicalPlanNodePtr PhysicalWindowSort::build( auto physical_window_sort = std::make_shared( executor_id, child->getSchema(), + fine_grained_shuffle, log->identifier(), child, - order_descr, - fine_grained_shuffle); + order_descr); return physical_window_sort; } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalWindowSort.h b/dbms/src/Flash/Planner/Plans/PhysicalWindowSort.h index 3ad4251a19a..2692bb6d387 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalWindowSort.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalWindowSort.h @@ -35,13 +35,12 @@ class PhysicalWindowSort : public PhysicalUnary PhysicalWindowSort( const String & executor_id_, const NamesAndTypes & schema_, + const FineGrainedShuffle & fine_grained_shuffle_, const String & req_id, const PhysicalPlanNodePtr & child_, - const SortDescription & order_descr_, - const FineGrainedShuffle & fine_grained_shuffle_) - : PhysicalUnary(executor_id_, PlanType::WindowSort, schema_, req_id, child_) + const SortDescription & order_descr_) + : PhysicalUnary(executor_id_, PlanType::WindowSort, schema_, fine_grained_shuffle_, req_id, child_) , order_descr(order_descr_) - , fine_grained_shuffle(fine_grained_shuffle_) {} void finalize(const Names & parent_require) override; @@ -53,6 +52,5 @@ class PhysicalWindowSort : public PhysicalUnary private: SortDescription order_descr; - FineGrainedShuffle fine_grained_shuffle; }; } // namespace DB diff --git a/dbms/src/Flash/tests/gtest_compute_server.cpp b/dbms/src/Flash/tests/gtest_compute_server.cpp index 27543f1f1fd..5dbf7db9c3c 100644 --- a/dbms/src/Flash/tests/gtest_compute_server.cpp +++ b/dbms/src/Flash/tests/gtest_compute_server.cpp @@ -198,14 +198,18 @@ exchange_sender_4 | type:PassThrough, {<0, Long>} exchange_sender_4 | type:PassThrough, {<0, Long>} project_3 | {<0, Long>} exchange_receiver_2 | type:Hash, {<0, Long>})"}; - ASSERT_MPPTASK_EQUAL_PLAN_AND_RESULT( - context - .scan("test_db", "big_table") - .exchangeSender(tipb::ExchangeType::Hash) - .exchangeReceiver("recv", {{"s1", TiDB::TP::TypeLong}}) - .project({"s1"}), - expected_strings, - expected_cols); + std::vector fine_grained_shuffle_stream_count{8, 0}; + for (uint64_t stream_count : fine_grained_shuffle_stream_count) + { + ASSERT_MPPTASK_EQUAL_PLAN_AND_RESULT( + context + .scan("test_db", "big_table") + .exchangeSender(tipb::ExchangeType::Hash, {"test_db.big_table.s1"}, stream_count) + .exchangeReceiver("recv", {{"s1", TiDB::TP::TypeLong}}, stream_count) + .project({"s1"}), + expected_strings, + expected_cols); + } } WRAP_FOR_SERVER_TEST_END } diff --git a/dbms/src/Flash/tests/gtest_fine_grained_shuffle.cpp b/dbms/src/Flash/tests/gtest_fine_grained_shuffle.cpp new file mode 100644 index 00000000000..b405b03be79 --- /dev/null +++ b/dbms/src/Flash/tests/gtest_fine_grained_shuffle.cpp @@ -0,0 +1,90 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB::tests +{ +class FineGrainedShuffleTestRunner : public DB::tests::ExecutorTest +{ + void initializeContext() override + { + ExecutorTest::initializeContext(); + + DB::MockColumnInfoVec column_infos{{"partition", TiDB::TP::TypeLong}, {"value", TiDB::TP::TypeLong}}; + DB::MockColumnInfoVec partition_column_infos{{"partition", TiDB::TP::TypeLong}}; + ColumnsWithTypeAndName column_data; + ColumnsWithTypeAndName common_column_data; + size_t table_rows = 1024; + for (const auto & column_info : mockColumnInfosToTiDBColumnInfos(column_infos)) + { + ColumnGeneratorOpts opts{table_rows, getDataTypeByColumnInfoForComputingLayer(column_info)->getName(), RANDOM, column_info.name}; + column_data.push_back(ColumnGenerator::instance().generate(opts)); + } + ColumnWithTypeAndName shuffle_column = ColumnGenerator::instance().generate({table_rows, "UInt64", RANDOM}); + IColumn::Permutation perm; + shuffle_column.column->getPermutation(false, 0, -1, perm); + for (auto & column : column_data) + { + column.column = column.column->permute(perm, 0); + } + + context.addExchangeReceiver("exchange_receiver_1_concurrency", column_infos, column_data, 1, partition_column_infos); + context.addExchangeReceiver("exchange_receiver_3_concurrency", column_infos, column_data, 3, partition_column_infos); + context.addExchangeReceiver("exchange_receiver_5_concurrency", column_infos, column_data, 5, partition_column_infos); + context.addExchangeReceiver("exchange_receiver_10_concurrency", column_infos, column_data, 10, partition_column_infos); + } +}; + +TEST_F(FineGrainedShuffleTestRunner, simpleReceiver) +try +{ + std::vector exchange_receiver_concurrency = {1, 3, 5, 10}; + + auto gen_request = [&](size_t exchange_concurrency) { + return context + .receive(fmt::format("exchange_receiver_{}_concurrency", exchange_concurrency), exchange_concurrency) + .build(context); + }; + + auto baseline = executeStreams(gen_request(1), 1); + for (size_t exchange_concurrency : exchange_receiver_concurrency) + { + executeAndAssertColumnsEqual(gen_request(exchange_concurrency), baseline); + } +} +CATCH + +TEST_F(FineGrainedShuffleTestRunner, FineGrainedShuffleReceiverAndThenNonFineGrainedShuffleAgg) +try +{ + std::vector exchange_receiver_concurrency = {1, 3, 5, 10}; + + auto gen_request = [&](size_t exchange_concurrency) { + return context + .receive(fmt::format("exchange_receiver_{}_concurrency", exchange_concurrency), exchange_concurrency) + .aggregation({Max(col("partition"))}, {col("value")}) + .build(context); + }; + + auto baseline = executeStreams(gen_request(1), 1); + for (size_t exchange_concurrency : exchange_receiver_concurrency) + { + executeAndAssertColumnsEqual(gen_request(exchange_concurrency), baseline); + } +} +CATCH + +} // namespace DB::tests diff --git a/dbms/src/Flash/tests/gtest_window_executor.cpp b/dbms/src/Flash/tests/gtest_window_executor.cpp index bfae19c24d2..220c189f7eb 100644 --- a/dbms/src/Flash/tests/gtest_window_executor.cpp +++ b/dbms/src/Flash/tests/gtest_window_executor.cpp @@ -18,8 +18,6 @@ namespace DB::tests { class WindowExecutorTestRunner : public DB::tests::ExecutorTest { - static const size_t max_concurrency_level = 10; - public: void initializeContext() override { diff --git a/dbms/src/TestUtils/ExecutorTestUtils.cpp b/dbms/src/TestUtils/ExecutorTestUtils.cpp index b75b012eda1..dcce7b2c034 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.cpp +++ b/dbms/src/TestUtils/ExecutorTestUtils.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -166,6 +167,8 @@ void ExecutorTest::executeExecutor( std::function<::testing::AssertionResult(const ColumnsWithTypeAndName &)> assert_func) { WRAP_FOR_TEST_BEGIN + if (enable_pipeline && !Pipeline::isSupported(*request)) + continue; std::vector concurrencies{1, 2, 10}; for (auto concurrency : concurrencies) { @@ -186,6 +189,8 @@ void ExecutorTest::checkBlockSorted( std::function<::testing::AssertionResult(const ColumnsWithTypeAndName &, const ColumnsWithTypeAndName &)> assert_func) { WRAP_FOR_TEST_BEGIN + if (enable_pipeline && !Pipeline::isSupported(*request)) + continue; std::vector concurrencies{2, 5, 10}; for (auto concurrency : concurrencies) { @@ -208,8 +213,8 @@ void ExecutorTest::checkBlockSorted( ASSERT_TRUE(assert_func(expected_res, res)) << testInfoMsg(request, enable_planner, enable_pipeline, concurrency, block_size); } }; - WRAP_FOR_TEST_END } + WRAP_FOR_TEST_END } void ExecutorTest::executeAndAssertColumnsEqual(const std::shared_ptr & request, const ColumnsWithTypeAndName & expect_columns) diff --git a/dbms/src/TestUtils/mockExecutor.cpp b/dbms/src/TestUtils/mockExecutor.cpp index b4cd1e0e108..479a18dd8cb 100644 --- a/dbms/src/TestUtils/mockExecutor.cpp +++ b/dbms/src/TestUtils/mockExecutor.cpp @@ -268,10 +268,15 @@ DAGRequestBuilder & DAGRequestBuilder::project(MockColumnNameVec col_names) return *this; } -DAGRequestBuilder & DAGRequestBuilder::exchangeSender(tipb::ExchangeType exchange_type) +DAGRequestBuilder & DAGRequestBuilder::exchangeSender(tipb::ExchangeType exchange_type, MockColumnNameVec part_keys, uint64_t fine_grained_shuffle_stream_count) { assert(root); - root = mock::compileExchangeSender(root, getExecutorIndex(), exchange_type); + auto partition_key_list = std::make_shared(); + for (const auto & part_key : part_keys) + { + partition_key_list->children.push_back(col(part_key)); + } + root = mock::compileExchangeSender(root, getExecutorIndex(), exchange_type, partition_key_list, fine_grained_shuffle_stream_count); return *this; } diff --git a/dbms/src/TestUtils/mockExecutor.h b/dbms/src/TestUtils/mockExecutor.h index 0b18bd3b54f..999f72ab25c 100644 --- a/dbms/src/TestUtils/mockExecutor.h +++ b/dbms/src/TestUtils/mockExecutor.h @@ -112,8 +112,7 @@ class DAGRequestBuilder } DAGRequestBuilder & project(MockColumnNameVec col_names); - - DAGRequestBuilder & exchangeSender(tipb::ExchangeType exchange_type); + DAGRequestBuilder & exchangeSender(tipb::ExchangeType exchange_type, MockColumnNameVec part_keys = {}, uint64_t fine_grained_shuffle_stream_count = 0); /// User should prefer using other simplified join buidler API instead of this one unless he/she have to test /// join conditional expressions and knows how TiDB translates sql's `join on` clause to conditional expressions. diff --git a/tests/docker/config/tiflash_dt_enable_pipeline.toml b/tests/docker/config/tiflash_dt_enable_pipeline.toml new file mode 100644 index 00000000000..18a24fd0253 --- /dev/null +++ b/tests/docker/config/tiflash_dt_enable_pipeline.toml @@ -0,0 +1,55 @@ +# Copyright 2022 PingCAP, Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +tmp_path = "/tmp/tiflash/data/tmp" + +path = "/tmp/tiflash/data/db" +capacity = "10737418240" + +mark_cache_size = 5368709120 +minmax_index_cache_size = 5368709120 +tcp_port = 9000 +http_port = 8123 + +[flash] +tidb_status_addr = "tidb0:10080" +service_addr = "0.0.0.0:3930" +[flash.flash_cluster] +update_rule_interval = 5 +[flash.proxy] +addr = "0.0.0.0:20170" +advertise-addr = "tiflash0:20170" +data-dir = "/data" +config = "/proxy.toml" +log-file = "/log/proxy.log" +engine-addr = "tiflash0:3930" +status-addr = "0.0.0.0:20181" +advertise-status-addr = "tiflash0:20181" + +[logger] +count = 10 +errorlog = "/tmp/tiflash/log/error.log" +size = "1000M" +log = "/tmp/tiflash/log/server.log" +level = "trace" + +[raft] +pd_addr = "pd0:2379" +ignore_databases = "system,default" + +[profiles] +[profiles.default] +enable_planner = 1 +enable_pipeline = 1 +# max_memory_usage = 0 diff --git a/tests/docker/tiflash-dt-enable-pipeline.yaml b/tests/docker/tiflash-dt-enable-pipeline.yaml new file mode 100644 index 00000000000..463cb3ea71e --- /dev/null +++ b/tests/docker/tiflash-dt-enable-pipeline.yaml @@ -0,0 +1,43 @@ +# Copyright 2022 PingCAP, Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: '2.3' + +services: + # for tests under fullstack-test directory + # (engine DeltaTree) + tiflash0: + image: hub.pingcap.net/tiflash/tiflash-ci-base + volumes: + - ./config/tiflash_dt_enable_pipeline.toml:/config.toml:ro + - ./data/tiflash:/tmp/tiflash/data + - ./log/tiflash:/tmp/tiflash/log + - ..:/tests + - ../docker/_env.sh:/tests/_env.sh + - ./log/tiflash-cluster-manager:/tmp/tiflash/data/tmp + - ./config/proxy.toml:/proxy.toml:ro + - ./config/cipher-file-256:/cipher-file-256:ro + - ./data/proxy:/data + - ./log/proxy:/log + - ../.build/tiflash:/tiflash + entrypoint: + - /tiflash/tiflash + - server + - --config-file + - /config.toml + restart: on-failure + depends_on: + - "pd0" + - "tikv0" + diff --git a/tests/tidb-ci/enable_pipeline b/tests/tidb-ci/enable_pipeline new file mode 120000 index 00000000000..1c6442406ac --- /dev/null +++ b/tests/tidb-ci/enable_pipeline @@ -0,0 +1 @@ +../fullstack-test/mpp \ No newline at end of file diff --git a/tests/tidb-ci/run.sh b/tests/tidb-ci/run.sh index 18a242fe4d3..bb7a5f5283d 100755 --- a/tests/tidb-ci/run.sh +++ b/tests/tidb-ci/run.sh @@ -54,6 +54,13 @@ docker-compose -f cluster.yaml -f tiflash-dt-disable-planner.yaml exec -T tiflas docker-compose -f cluster.yaml -f tiflash-dt-disable-planner.yaml down clean_data_log +docker-compose -f cluster.yaml -f tiflash-dt-enable-pipeline.yaml up -d +wait_env +docker-compose -f cluster.yaml -f tiflash-dt-enable-pipeline.yaml exec -T tiflash0 bash -c 'cd /tests ; ./run-test.sh tidb-ci/enable_pipeline' + +docker-compose -f cluster.yaml -f tiflash-dt-enable-pipeline.yaml down +clean_data_log + # run new_collation_fullstack tests docker-compose -f cluster_new_collation.yaml -f tiflash-dt.yaml down clean_data_log diff --git a/tests/tidb-ci/tiflash-dt-enable-pipeline.yaml b/tests/tidb-ci/tiflash-dt-enable-pipeline.yaml new file mode 120000 index 00000000000..75059107013 --- /dev/null +++ b/tests/tidb-ci/tiflash-dt-enable-pipeline.yaml @@ -0,0 +1 @@ +../docker/tiflash-dt-enable-pipeline.yaml \ No newline at end of file