Skip to content

Commit

Permalink
Pipeline: optimize part scan (#7723)
Browse files Browse the repository at this point in the history
ref #4474, ref #6518
  • Loading branch information
SeaRise authored Jul 7, 2023
1 parent b2791df commit e98f5d6
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 8 deletions.
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/MultiplexInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class MultiplexInputStream final : public IProfilingBlockInputStream

if (cur_stream)
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*cur_stream))
if (auto * child = dynamic_cast<IProfilingBlockInputStream *>(&*cur_stream))
{
child->cancel(kill);
}
Expand Down
15 changes: 12 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Operators/BlockInputStreamSourceOp.h>
#include <Operators/ConcatSourceOp.h>
#include <Operators/CoprocessorReaderSourceOp.h>
#include <Operators/ExpressionTransformOp.h>
#include <Operators/NullSourceOp.h>
Expand Down Expand Up @@ -1146,10 +1147,10 @@ void DAGStorageInterpreter::buildLocalExec(
return;
mvcc_query_info->scan_context->total_local_region_num = total_local_region_num;
const auto table_query_infos = generateSelectQueryInfos();
bool has_multiple_partitions = table_query_infos.size() > 1;
ConcatBuilderPool builder_pool{max_streams};

auto disaggregated_snap = std::make_shared<DM::Remote::DisaggReadSnapshot>();
// TODO Improve the performance of partition table in extreme case.
// ref https://github.com/pingcap/tiflash/issues/4474
for (const auto & table_query_info : table_query_infos)
{
PipelineExecGroupBuilder builder;
Expand All @@ -1161,7 +1162,10 @@ void DAGStorageInterpreter::buildLocalExec(
disaggregated_snap->addTask(table_id, std::move(table_snap));
}

group_builder.merge(std::move(builder));
if (has_multiple_partitions)
builder_pool.add(builder);
else
group_builder.merge(std::move(builder));
}

LOG_DEBUG(
Expand All @@ -1179,6 +1183,11 @@ void DAGStorageInterpreter::buildLocalExec(
bool register_snapshot_ok = snaps->registerSnapshot(snap_id, disaggregated_snap, expired_at);
RUNTIME_CHECK_MSG(register_snapshot_ok, "Disaggregated task has been registered, snap_id={}", snap_id);
}

if (has_multiple_partitions)
{
builder_pool.generate(group_builder, exec_status, log->identifier());
}
}

std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAGStorageInterpreter::getAndLockStorages(Int64 query_schema_version)
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ void PipelineExecGroupBuilder::addConcurrency(SourceOpPtr && source)
cur_group.back().setSourceOp(std::move(source));
}

void PipelineExecGroupBuilder::addConcurrency(PipelineExecBuilder && exec_builder)
{
RUNTIME_CHECK(exec_builder.source_op);
auto & cur_group = getCurGroup();
cur_group.push_back(std::move(exec_builder));
}

void PipelineExecGroupBuilder::reset()
{
groups.clear();
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,16 @@ class PipelineExecGroupBuilder

void addGroup() { groups.emplace_back(); }

size_t groupCnt() const { return groups.size(); }

size_t concurrency() const { return getCurGroup().size(); }

bool empty() const { return getCurGroup().empty(); }

void addConcurrency(SourceOpPtr && source);

void addConcurrency(PipelineExecBuilder && exec_builder);

void reset();

void merge(PipelineExecGroupBuilder && other);
Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Functions/FunctionsStringSearch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@
#include <memory>
#include <mutex>

#include "Columns/IColumn.h"
#include "Common/Exception.h"
#include "common/defines.h"

#if USE_RE2_ST
#include <re2_st/re2.h>
#else
Expand Down
229 changes: 229 additions & 0 deletions dbms/src/Operators/ConcatSourceOp.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Exception.h>
#include <Flash/Pipeline/Exec/PipelineExec.h>
#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Operators/Operator.h>

#include <memory>

namespace DB
{
class SetBlockSinkOp : public SinkOp
{
public:
SetBlockSinkOp(
PipelineExecutorStatus & exec_status_,
const String & req_id,
Block & res_)
: SinkOp(exec_status_, req_id)
, res(res_)
{
}

String getName() const override
{
return "SetBlockSinkOp";
}

protected:
OperatorStatus writeImpl(Block && block) override
{
if unlikely (!block)
return OperatorStatus::FINISHED;

assert(!res);
res = std::move(block);
return OperatorStatus::NEED_INPUT;
}

private:
Block & res;
};

/// Used to merge multiple partitioned tables of storage layer.
class ConcatSourceOp : public SourceOp
{
public:
ConcatSourceOp(
PipelineExecutorStatus & exec_status_,
const String & req_id,
std::vector<PipelineExecBuilder> & exec_builder_pool)
: SourceOp(exec_status_, req_id)
{
RUNTIME_CHECK(!exec_builder_pool.empty());
setHeader(exec_builder_pool.back().getCurrentHeader());
for (auto & exec_builder : exec_builder_pool)
{
exec_builder.setSinkOp(std::make_unique<SetBlockSinkOp>(exec_status_, req_id, res));
exec_pool.push_back(exec_builder.build());
}
}

String getName() const override
{
return "ConcatSourceOp";
}

// ConcatSourceOp is used to merge multiple partitioned tables of storage layer, so override `getIOProfileInfo` is needed here.
IOProfileInfoPtr getIOProfileInfo() const override { return IOProfileInfo::createForLocal(profile_info_ptr); }

protected:
void operatePrefixImpl() override
{
if (!popExec())
done = true;
}

void operateSuffixImpl() override
{
if (cur_exec)
{
cur_exec->executeSuffix();
cur_exec.reset();
}
exec_pool.clear();
}

OperatorStatus readImpl(Block & block) override
{
if unlikely (done)
return OperatorStatus::HAS_OUTPUT;

if unlikely (res)
{
std::swap(block, res);
return OperatorStatus::HAS_OUTPUT;
}

while (true)
{
assert(cur_exec);
auto status = cur_exec->execute();
switch (status)
{
case OperatorStatus::NEED_INPUT:
assert(res);
std::swap(block, res);
return OperatorStatus::HAS_OUTPUT;
case OperatorStatus::FINISHED:
cur_exec->executeSuffix();
cur_exec.reset();
if (!popExec())
{
done = true;
return OperatorStatus::HAS_OUTPUT;
}
break;
default:
return status;
}
}
}

OperatorStatus executeIOImpl() override
{
if unlikely (done || res)
return OperatorStatus::HAS_OUTPUT;

assert(cur_exec);
auto status = cur_exec->executeIO();
assert(status != OperatorStatus::FINISHED);
return status;
}

OperatorStatus awaitImpl() override
{
if unlikely (done || res)
return OperatorStatus::HAS_OUTPUT;

assert(cur_exec);
auto status = cur_exec->await();
assert(status != OperatorStatus::FINISHED);
return status;
}

private:
bool popExec()
{
assert(!cur_exec);
if (exec_pool.empty())
{
return false;
}
else
{
cur_exec = std::move(exec_pool.front());
exec_pool.pop_front();
cur_exec->executePrefix();
return true;
}
}

private:
std::deque<PipelineExecPtr> exec_pool;
PipelineExecPtr cur_exec;

Block res;
bool done = false;
};

class ConcatBuilderPool
{
public:
explicit ConcatBuilderPool(size_t expect_size)
{
RUNTIME_CHECK(expect_size > 0);
pool.resize(expect_size);
}

void add(PipelineExecGroupBuilder & group_builder)
{
RUNTIME_CHECK(group_builder.groupCnt() == 1);
for (size_t i = 0; i < group_builder.concurrency(); ++i)
{
pool[pre_index++].push_back(std::move(group_builder.getCurBuilder(i)));
if (pre_index == pool.size())
pre_index = 0;
}
}

void generate(PipelineExecGroupBuilder & result_builder, PipelineExecutorStatus & exec_status, const String & req_id)
{
RUNTIME_CHECK(result_builder.empty());
for (auto & builders : pool)
{
if (builders.empty())
{
continue;
}
else if (builders.size() == 1)
{
result_builder.addConcurrency(std::move(builders.back()));
}
else
{
result_builder.addConcurrency(std::make_unique<ConcatSourceOp>(exec_status, req_id, builders));
}
}
}

private:
std::vector<std::vector<PipelineExecBuilder>> pool;
size_t pre_index = 0;
};
} // namespace DB
Loading

0 comments on commit e98f5d6

Please sign in to comment.