diff --git a/dbms/src/DataStreams/MultiplexInputStream.h b/dbms/src/DataStreams/MultiplexInputStream.h index c418dcbb927..5deac871a8a 100644 --- a/dbms/src/DataStreams/MultiplexInputStream.h +++ b/dbms/src/DataStreams/MultiplexInputStream.h @@ -177,7 +177,7 @@ class MultiplexInputStream final : public IProfilingBlockInputStream if (cur_stream) { - if (IProfilingBlockInputStream * child = dynamic_cast(&*cur_stream)) + if (auto * child = dynamic_cast(&*cur_stream)) { child->cancel(kill); } diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 9c96612070b..85443875741 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -1146,10 +1147,10 @@ void DAGStorageInterpreter::buildLocalExec( return; mvcc_query_info->scan_context->total_local_region_num = total_local_region_num; const auto table_query_infos = generateSelectQueryInfos(); + bool has_multiple_partitions = table_query_infos.size() > 1; + ConcatBuilderPool builder_pool{max_streams}; auto disaggregated_snap = std::make_shared(); - // TODO Improve the performance of partition table in extreme case. - // ref https://github.com/pingcap/tiflash/issues/4474 for (const auto & table_query_info : table_query_infos) { PipelineExecGroupBuilder builder; @@ -1161,7 +1162,10 @@ void DAGStorageInterpreter::buildLocalExec( disaggregated_snap->addTask(table_id, std::move(table_snap)); } - group_builder.merge(std::move(builder)); + if (has_multiple_partitions) + builder_pool.add(builder); + else + group_builder.merge(std::move(builder)); } LOG_DEBUG( @@ -1179,6 +1183,11 @@ void DAGStorageInterpreter::buildLocalExec( bool register_snapshot_ok = snaps->registerSnapshot(snap_id, disaggregated_snap, expired_at); RUNTIME_CHECK_MSG(register_snapshot_ok, "Disaggregated task has been registered, snap_id={}", snap_id); } + + if (has_multiple_partitions) + { + builder_pool.generate(group_builder, exec_status, log->identifier()); + } } std::unordered_map DAGStorageInterpreter::getAndLockStorages(Int64 query_schema_version) diff --git a/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.cpp b/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.cpp index 15ab3765e69..93bfcdcff6a 100644 --- a/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.cpp +++ b/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.cpp @@ -91,6 +91,13 @@ void PipelineExecGroupBuilder::addConcurrency(SourceOpPtr && source) cur_group.back().setSourceOp(std::move(source)); } +void PipelineExecGroupBuilder::addConcurrency(PipelineExecBuilder && exec_builder) +{ + RUNTIME_CHECK(exec_builder.source_op); + auto & cur_group = getCurGroup(); + cur_group.push_back(std::move(exec_builder)); +} + void PipelineExecGroupBuilder::reset() { groups.clear(); diff --git a/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.h b/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.h index 01d8554a086..0d097823491 100644 --- a/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.h +++ b/dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.h @@ -56,12 +56,16 @@ class PipelineExecGroupBuilder void addGroup() { groups.emplace_back(); } + size_t groupCnt() const { return groups.size(); } + size_t concurrency() const { return getCurGroup().size(); } bool empty() const { return getCurGroup().empty(); } void addConcurrency(SourceOpPtr && source); + void addConcurrency(PipelineExecBuilder && exec_builder); + void reset(); void merge(PipelineExecGroupBuilder && other); diff --git a/dbms/src/Functions/FunctionsStringSearch.cpp b/dbms/src/Functions/FunctionsStringSearch.cpp index a02280f0769..1ed4349ed35 100644 --- a/dbms/src/Functions/FunctionsStringSearch.cpp +++ b/dbms/src/Functions/FunctionsStringSearch.cpp @@ -34,10 +34,6 @@ #include #include -#include "Columns/IColumn.h" -#include "Common/Exception.h" -#include "common/defines.h" - #if USE_RE2_ST #include #else diff --git a/dbms/src/Operators/ConcatSourceOp.h b/dbms/src/Operators/ConcatSourceOp.h new file mode 100644 index 00000000000..78b82e8d3ed --- /dev/null +++ b/dbms/src/Operators/ConcatSourceOp.h @@ -0,0 +1,229 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include + +namespace DB +{ +class SetBlockSinkOp : public SinkOp +{ +public: + SetBlockSinkOp( + PipelineExecutorStatus & exec_status_, + const String & req_id, + Block & res_) + : SinkOp(exec_status_, req_id) + , res(res_) + { + } + + String getName() const override + { + return "SetBlockSinkOp"; + } + +protected: + OperatorStatus writeImpl(Block && block) override + { + if unlikely (!block) + return OperatorStatus::FINISHED; + + assert(!res); + res = std::move(block); + return OperatorStatus::NEED_INPUT; + } + +private: + Block & res; +}; + +/// Used to merge multiple partitioned tables of storage layer. +class ConcatSourceOp : public SourceOp +{ +public: + ConcatSourceOp( + PipelineExecutorStatus & exec_status_, + const String & req_id, + std::vector & exec_builder_pool) + : SourceOp(exec_status_, req_id) + { + RUNTIME_CHECK(!exec_builder_pool.empty()); + setHeader(exec_builder_pool.back().getCurrentHeader()); + for (auto & exec_builder : exec_builder_pool) + { + exec_builder.setSinkOp(std::make_unique(exec_status_, req_id, res)); + exec_pool.push_back(exec_builder.build()); + } + } + + String getName() const override + { + return "ConcatSourceOp"; + } + + // ConcatSourceOp is used to merge multiple partitioned tables of storage layer, so override `getIOProfileInfo` is needed here. + IOProfileInfoPtr getIOProfileInfo() const override { return IOProfileInfo::createForLocal(profile_info_ptr); } + +protected: + void operatePrefixImpl() override + { + if (!popExec()) + done = true; + } + + void operateSuffixImpl() override + { + if (cur_exec) + { + cur_exec->executeSuffix(); + cur_exec.reset(); + } + exec_pool.clear(); + } + + OperatorStatus readImpl(Block & block) override + { + if unlikely (done) + return OperatorStatus::HAS_OUTPUT; + + if unlikely (res) + { + std::swap(block, res); + return OperatorStatus::HAS_OUTPUT; + } + + while (true) + { + assert(cur_exec); + auto status = cur_exec->execute(); + switch (status) + { + case OperatorStatus::NEED_INPUT: + assert(res); + std::swap(block, res); + return OperatorStatus::HAS_OUTPUT; + case OperatorStatus::FINISHED: + cur_exec->executeSuffix(); + cur_exec.reset(); + if (!popExec()) + { + done = true; + return OperatorStatus::HAS_OUTPUT; + } + break; + default: + return status; + } + } + } + + OperatorStatus executeIOImpl() override + { + if unlikely (done || res) + return OperatorStatus::HAS_OUTPUT; + + assert(cur_exec); + auto status = cur_exec->executeIO(); + assert(status != OperatorStatus::FINISHED); + return status; + } + + OperatorStatus awaitImpl() override + { + if unlikely (done || res) + return OperatorStatus::HAS_OUTPUT; + + assert(cur_exec); + auto status = cur_exec->await(); + assert(status != OperatorStatus::FINISHED); + return status; + } + +private: + bool popExec() + { + assert(!cur_exec); + if (exec_pool.empty()) + { + return false; + } + else + { + cur_exec = std::move(exec_pool.front()); + exec_pool.pop_front(); + cur_exec->executePrefix(); + return true; + } + } + +private: + std::deque exec_pool; + PipelineExecPtr cur_exec; + + Block res; + bool done = false; +}; + +class ConcatBuilderPool +{ +public: + explicit ConcatBuilderPool(size_t expect_size) + { + RUNTIME_CHECK(expect_size > 0); + pool.resize(expect_size); + } + + void add(PipelineExecGroupBuilder & group_builder) + { + RUNTIME_CHECK(group_builder.groupCnt() == 1); + for (size_t i = 0; i < group_builder.concurrency(); ++i) + { + pool[pre_index++].push_back(std::move(group_builder.getCurBuilder(i))); + if (pre_index == pool.size()) + pre_index = 0; + } + } + + void generate(PipelineExecGroupBuilder & result_builder, PipelineExecutorStatus & exec_status, const String & req_id) + { + RUNTIME_CHECK(result_builder.empty()); + for (auto & builders : pool) + { + if (builders.empty()) + { + continue; + } + else if (builders.size() == 1) + { + result_builder.addConcurrency(std::move(builders.back())); + } + else + { + result_builder.addConcurrency(std::make_unique(exec_status, req_id, builders)); + } + } + } + +private: + std::vector> pool; + size_t pre_index = 0; +}; +} // namespace DB diff --git a/dbms/src/Operators/tests/gtest_concat_source.cpp b/dbms/src/Operators/tests/gtest_concat_source.cpp new file mode 100644 index 00000000000..a8b9b75a4d9 --- /dev/null +++ b/dbms/src/Operators/tests/gtest_concat_source.cpp @@ -0,0 +1,105 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include +#include + +namespace DB::tests +{ +namespace +{ +class MockSourceOp : public SourceOp +{ +public: + MockSourceOp( + PipelineExecutorStatus & exec_status_, + const Block & output_) + : SourceOp(exec_status_, "mock") + , output(output_) + { + setHeader(output.cloneEmpty()); + } + + String getName() const override + { + return "MockSourceOp"; + } + +protected: + OperatorStatus readImpl(Block & block) override + { + std::swap(block, output); + return OperatorStatus::HAS_OUTPUT; + } + +private: + Block output; +}; +} // namespace + +class TestConcatSource : public ::testing::Test +{ +}; + +TEST_F(TestConcatSource, setBlockSink) +{ + Block res; + ASSERT_FALSE(res); + + PipelineExecutorStatus exec_status; + SetBlockSinkOp set_block_sink{exec_status, "test", res}; + Block header{ColumnGenerator::instance().generate({0, "Int32", DataDistribution::RANDOM})}; + set_block_sink.setHeader(header); + Block block{ColumnGenerator::instance().generate({2, "Int32", DataDistribution::RANDOM})}; + set_block_sink.operatePrefix(); + set_block_sink.write(std::move(block)); + ASSERT_TRUE(res); + set_block_sink.operateSuffix(); +} + +TEST_F(TestConcatSource, concatSink) +{ + size_t block_cnt = 10; + std::vector builders; + PipelineExecutorStatus exec_status; + for (size_t i = 0; i < block_cnt; ++i) + { + PipelineExecBuilder builder; + Block block{ColumnGenerator::instance().generate({2, "Int32", DataDistribution::RANDOM})}; + builder.setSourceOp(std::make_unique(exec_status, block)); + builders.push_back(std::move(builder)); + } + + ConcatSourceOp concat_source{exec_status, "test", builders}; + size_t actual_block_cnt = 0; + concat_source.operatePrefix(); + while (true) + { + Block tmp; + concat_source.read(tmp); + if (tmp) + ++actual_block_cnt; + else + break; + } + concat_source.operateSuffix(); + ASSERT_EQ(actual_block_cnt, block_cnt); +} + +} // namespace DB::tests