Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Planner: Support Planner Interpreter #5321

Merged
merged 26 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
44c4400
Feature Branch: merge master to `planner_refactory` branch (#5034)
SeaRise Jun 8, 2022
aabe213
Introduce physical plan and add switch (#4820)
SeaRise Jun 13, 2022
07b1b20
Planner: Support common physical plans (#5143)
SeaRise Jul 8, 2022
d027469
merge master to planner_refactory
SeaRise Jul 18, 2022
7693e14
merge master to Feature branch: planner_refactory ref #4739
SeaRise Jul 18, 2022
467fb68
merge master to planner_refactory
SeaRise Jul 21, 2022
aa20f61
Feature Branch: merge master to planner_refactory (#5426)
ti-chi-bot Jul 25, 2022
42b4055
Planner: support table scan (#5335)
SeaRise Jul 25, 2022
a528f48
Planner: support Join (#5320)
SeaRise Jul 25, 2022
d9bcbde
Planner: support window (#5363)
SeaRise Jul 26, 2022
0f26fe8
Planner: planner without `DAGQueryBlock` (#5381)
SeaRise Aug 2, 2022
45088e9
merge master to planner_refactory
SeaRise Aug 3, 2022
120214f
open License checker for planner_refactory
SeaRise Aug 3, 2022
67a084c
Feature Branch: merge master to planner_refactory (#5493)
ti-chi-bot Aug 3, 2022
06cf87e
revert License checker for feature branch: planner_refactory
SeaRise Aug 3, 2022
609b51d
address comments from review #5321
SeaRise Aug 4, 2022
387b366
Merge branch 'master' into planner_refactory
SeaRise Aug 8, 2022
04e2fba
fix conflict from master
SeaRise Aug 8, 2022
ca94ba0
address comments
SeaRise Aug 9, 2022
98581c2
address comments
SeaRise Aug 10, 2022
dca94c6
fix clang tidy
SeaRise Aug 10, 2022
2c12eca
Update dbms/src/Core/ColumnsWithTypeAndName.h
SeaRise Aug 10, 2022
28ae5f0
Update dbms/src/Flash/Planner/plans/PhysicalBinary.h
SeaRise Aug 10, 2022
9f3bbd9
Update dbms/src/Flash/Planner/plans/PhysicalUnary.h
SeaRise Aug 10, 2022
fb13269
Update dbms/src/Flash/Planner/plans/PhysicalBinary.h
SeaRise Aug 10, 2022
f1b9c35
Merge branch 'master' into planner_refactory
ti-chi-bot Aug 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions dbms/src/Common/TiFlashException.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,23 @@ namespace DB
"This error usually occurs when the TiFlash server is busy or the TiFlash node is down.\n", \
""); \
) \
C(Planner, \
E(BadRequest, "Bad TiDB DAGRequest.", \
"This error is usually caused by incorrect TiDB DAGRequest. \n" \
"Please contact with developer, \n" \
"better providing information about your cluster(log, topology information etc.).", \
""); \
E(Unimplemented, "Some features are unimplemented.", \
"This error may caused by unmatched TiDB and TiFlash versions, \n" \
"and should not occur in common case. \n" \
"Please contact with developer, \n" \
"better providing information about your cluster(log, topology information etc.).", \
""); \
E(Internal, "TiFlash Planner internal error.", \
"Please contact with developer, \n" \
"better providing information about your cluster(log, topology information etc.).", \
""); \
) \
C(Table, \
E(SchemaVersionError, "Schema version of target table in TiFlash is different from that in query.", \
"TiFlash will sync the newest schema from TiDB before processing every query. \n" \
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Core/Block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ Block::Block(const ColumnsWithTypeAndName & data_)
}


Block::Block(const NamesAndTypes & names_and_types)
{
data.reserve(names_and_types.size());
for (const auto & name_and_type : names_and_types)
data.emplace_back(name_and_type.type, name_and_type.name);
initializeIndexByName();
}


void Block::initializeIndexByName()
{
for (size_t i = 0, size = data.size(); i < size; ++i)
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Core/Block.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class Block
Block() = default;
Block(std::initializer_list<ColumnWithTypeAndName> il);
explicit Block(const ColumnsWithTypeAndName & data_);
explicit Block(const NamesAndTypes & names_and_types);

/// insert the column at the specified position
void insert(size_t position, const ColumnWithTypeAndName & elem);
Expand Down
27 changes: 27 additions & 0 deletions dbms/src/Core/NamesAndTypes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FmtUtils.h>
#include <Core/NamesAndTypes.h>
#include <DataTypes/DataTypeFactory.h>
#include <IO/ReadBuffer.h>
Expand All @@ -31,6 +32,32 @@ namespace ErrorCodes
extern const int THERE_IS_NO_COLUMN;
}

String dumpJsonStructure(const NamesAndTypes & names_and_types)
{
FmtBuffer bf;
bf.append("[");
bf.joinStr(
names_and_types.cbegin(),
names_and_types.cend(),
[](const auto & name_and_type, FmtBuffer & fb) {
fb.fmtAppend(
R"json({{"name":"{}","type":{}}})json",
name_and_type.name,
(name_and_type.type ? "\"" + name_and_type.type->getName() + "\"" : "null"));
},
", ");
bf.append("]");
return bf.toString();
}

Names toNames(const NamesAndTypes & names_and_types)
{
Names names;
names.reserve(names_and_types.size());
for (const auto & name_and_type : names_and_types)
names.push_back(name_and_type.name);
return names;
}

void NamesAndTypesList::readText(ReadBuffer & buf)
{
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Core/NamesAndTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ struct NameAndTypePair

using NamesAndTypes = std::vector<NameAndTypePair>;

String dumpJsonStructure(const NamesAndTypes & names_and_types);

Names toNames(const NamesAndTypes & names_and_types);

class NamesAndTypesList : public std::list<NameAndTypePair>
{
public:
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ include(${TiFlash_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
add_headers_and_sources(flash_service .)
add_headers_and_sources(flash_service ./Coprocessor)
add_headers_and_sources(flash_service ./Mpp)
add_headers_and_sources(flash_service ./Planner)
add_headers_and_sources(flash_service ./Planner/plans)
add_headers_and_sources(flash_service ./Statistics)
add_headers_and_sources(flash_service ./Management)

Expand All @@ -25,5 +27,6 @@ target_link_libraries(flash_service dbms)

if (ENABLE_TESTS)
add_subdirectory(Coprocessor/tests)
add_subdirectory(Planner/tests)
add_subdirectory(tests)
endif ()
8 changes: 1 addition & 7 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <Common/Logger.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/FineGrainedShuffle.h>
#include <Flash/Coprocessor/TablesRegionsInfo.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Interpreters/SubqueryForSet.h>
Expand Down Expand Up @@ -117,13 +118,6 @@ constexpr UInt64 NO_ENGINE_SUBSTITUTION = 1ul << 30ul;
constexpr UInt64 ALLOW_INVALID_DATES = 1ul << 32ul;
} // namespace TiDBSQLMode

inline bool enableFineGrainedShuffle(uint64_t stream_count)
{
return stream_count > 0;
}

static constexpr std::string_view enableFineGrainedShuffleExtraInfo = "enable fine grained shuffle";

/// A context used to track the information that needs to be passed around during DAG planning.
class DAGContext
{
Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
#include <DataStreams/copyData.h>
#include <Flash/Coprocessor/DAGBlockOutputStream.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Coprocessor/StreamWriter.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
#include <Flash/Coprocessor/UnaryDAGResponseWriter.h>
#include <Flash/executeQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <Storages/Transaction/LockException.h>
#include <Storages/Transaction/RegionException.h>
#include <pingcap/Exception.h>
Expand Down Expand Up @@ -89,10 +88,9 @@ void DAGDriver<batch>::execute()
try
{
auto start_time = Clock::now();
DAGQuerySource dag(context);
DAGContext & dag_context = *context.getDAGContext();

BlockIO streams = executeQuery(dag, context, internal, QueryProcessingStage::Complete);
BlockIO streams = executeQuery(context, internal, QueryProcessingStage::Complete);
if (!streams.in || streams.out)
// Only query is allowed, so streams.in must not be null and streams.out must be null
throw TiFlashException("DAG is not query.", Errors::Coprocessor::Internal);
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ class DAGExpressionAnalyzer : private boost::noncopyable

const Context & getContext() const { return context; }

void reset(const std::vector<NameAndTypePair> & source_columns_)
{
source_columns = source_columns_;
prepared_sets.clear();
}

const std::vector<NameAndTypePair> & getCurrentInputColumns() const;

DAGPreparedSets & getPreparedSets() { return prepared_sets; }
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/ExchangeSenderInterpreterHelper.h>
#include <Flash/Coprocessor/FineGrainedShuffle.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
Expand Down Expand Up @@ -161,7 +162,7 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s
{
if (context.columnsForTestEmpty() || context.columnsForTest(table_scan.getTableScanExecutorID()).empty())
{
auto names_and_types = genNamesAndTypes(table_scan);
auto names_and_types = genNamesAndTypes(table_scan, "mock_table_scan");
auto columns_with_type_and_name = getColumnWithTypeAndName(names_and_types);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(names_and_types), context);
for (size_t i = 0; i < max_streams; ++i)
Expand All @@ -180,7 +181,7 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s

void DAGQueryBlockInterpreter::handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline)
{
const auto push_down_filter = PushDownFilter::toPushDownFilter(query_block.selection_name, query_block.selection);
const auto push_down_filter = PushDownFilter::pushDownFilterFrom(query_block.selection_name, query_block.selection);

DAGStorageInterpreter storage_interpreter(context, table_scan, push_down_filter, max_streams);
storage_interpreter.execute(pipeline);
Expand Down
45 changes: 45 additions & 0 deletions dbms/src/Flash/Coprocessor/FineGrainedShuffle.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Coprocessor/DAGContext.h>
#include <common/types.h>
#include <tipb/executor.pb.h>

namespace DB
{
static constexpr std::string_view enableFineGrainedShuffleExtraInfo = "enable fine grained shuffle";

inline bool enableFineGrainedShuffle(uint64_t stream_count)
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
{
return stream_count > 0;
}

struct FineGrainedShuffle
{
explicit FineGrainedShuffle(const tipb::Executor * executor)
: stream_count(executor ? executor->fine_grained_shuffle_stream_count() : 0)
, batch_size(executor ? executor->fine_grained_shuffle_batch_size() : 0)
{}

bool enable() const
{
return enableFineGrainedShuffle(stream_count);
}

const UInt64 stream_count;
const UInt64 batch_size;
};
} // namespace DB
34 changes: 25 additions & 9 deletions dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,50 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Storages/MutableSupport.h>
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/TypeMapping.h>

namespace DB
{
NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan)
namespace
{
DataTypePtr getPkType(const ColumnInfo & column_info)
{
const auto & pk_data_type = getDataTypeByColumnInfoForComputingLayer(column_info);
/// primary key type must be tidb_pk_column_int_type or tidb_pk_column_string_type.
RUNTIME_CHECK(
pk_data_type->equals(*MutableSupport::tidb_pk_column_int_type) || pk_data_type->equals(*MutableSupport::tidb_pk_column_string_type),
Exception(
fmt::format(
"Actual pk_data_type {} is not {} or {}",
pk_data_type->getName(),
MutableSupport::tidb_pk_column_int_type->getName(),
MutableSupport::tidb_pk_column_string_type->getName()),
ErrorCodes::LOGICAL_ERROR));
return pk_data_type;
}
} // namespace

NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef & column_prefix)
{
NamesAndTypes names_and_types;
names_and_types.reserve(table_scan.getColumnSize());
for (Int32 i = 0; i < table_scan.getColumnSize(); ++i)
{
TiDB::ColumnInfo column_info;
const auto & ci = table_scan.getColumns()[i];
column_info.tp = static_cast<TiDB::TP>(ci.tp());
column_info.id = ci.column_id();

auto column_info = TiDB::toTiDBColumnInfo(table_scan.getColumns()[i]);
switch (column_info.id)
{
case TiDBPkColumnID:
// TODO: need to check if the type of pk_handle_columns matches the type that used in delta merge tree.
names_and_types.emplace_back(MutableSupport::tidb_pk_column_name, getDataTypeByColumnInfoForComputingLayer(column_info));
names_and_types.emplace_back(MutableSupport::tidb_pk_column_name, getPkType(column_info));
break;
case ExtraTableIDColumnID:
names_and_types.emplace_back(MutableSupport::extra_table_id_column_name, MutableSupport::extra_table_id_column_type);
break;
default:
names_and_types.emplace_back(fmt::format("mock_table_scan_{}", i), getDataTypeByColumnInfoForComputingLayer(column_info));
names_and_types.emplace_back(fmt::format("{}_{}", column_prefix, i), getDataTypeByColumnInfoForComputingLayer(column_info));
}
}
return names_and_types;
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Storages/Transaction/TiDB.h>
#include <common/StringRef.h>

namespace DB
{
NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan);
NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef & column_prefix);
ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_types);
NamesAndTypes toNamesAndTypes(const DAGSchema & dag_schema);
} // namespace DB
11 changes: 8 additions & 3 deletions dbms/src/Flash/Coprocessor/PushDownFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,22 @@ tipb::Executor * PushDownFilter::constructSelectionForRemoteRead(tipb::Executor
}
}

PushDownFilter PushDownFilter::toPushDownFilter(const String & executor_id, const tipb::Executor * executor)
PushDownFilter PushDownFilter::pushDownFilterFrom(const String & executor_id, const tipb::Executor * executor)
{
if (!executor || !executor->has_selection())
{
return {"", {}};
}

return pushDownFilterFrom(executor_id, executor->selection());
}

PushDownFilter PushDownFilter::pushDownFilterFrom(const String & executor_id, const tipb::Selection & selection)
{
std::vector<const tipb::Expr *> conditions;
for (const auto & condition : executor->selection().conditions())
for (const auto & condition : selection.conditions())
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
conditions.push_back(&condition);

return {executor_id, conditions};
}
} // namespace DB
} // namespace DB
8 changes: 6 additions & 2 deletions dbms/src/Flash/Coprocessor/PushDownFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ namespace DB
{
struct PushDownFilter
{
static PushDownFilter toPushDownFilter(const String & executor_id, const tipb::Executor * executor);
static PushDownFilter pushDownFilterFrom(const String & executor_id, const tipb::Executor * executor);

static PushDownFilter pushDownFilterFrom(const String & executor_id, const tipb::Selection & selection);

PushDownFilter() = default;

PushDownFilter(
const String & executor_id_,
Expand All @@ -36,4 +40,4 @@ struct PushDownFilter
String executor_id;
std::vector<const tipb::Expr *> conditions;
};
} // namespace DB
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Common/ThreadFactory.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Coprocessor/CoprocessorReader.h>
#include <Flash/Coprocessor/FineGrainedShuffle.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Mpp/MPPTunnel.h>
#include <fmt/core.h>
Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
#include <Flash/Mpp/MinTSOScheduler.h>
#include <Flash/Mpp/Utils.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Flash/executeQuery.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/TMTContext.h>
#include <fmt/core.h>
Expand Down Expand Up @@ -319,8 +319,7 @@ void MPPTask::preprocess()
{
auto start_time = Clock::now();
initExchangeReceivers();
DAGQuerySource dag(*context);
executeQuery(dag, *context, false, QueryProcessingStage::Complete);
executeQuery(*context);
auto end_time = Clock::now();
dag_context->compile_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count();
mpp_task_statistics.setCompileTimestamp(start_time, end_time);
Expand Down
Loading