Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Planner: Support common physical plans #5143

Merged
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
d987c89
introd physical plan
SeaRise Apr 25, 2022
e24489a
add source
SeaRise Apr 25, 2022
99c6a74
fix
SeaRise Apr 25, 2022
c6363c8
fix
SeaRise Apr 25, 2022
bb44d1f
fix
SeaRise Apr 25, 2022
c51c152
add optimize
SeaRise May 10, 2022
a5647fd
format
SeaRise May 10, 2022
39cc3ec
license
SeaRise May 10, 2022
3960244
plan type
SeaRise May 10, 2022
9b86c41
u
SeaRise May 10, 2022
4bde83b
license check
SeaRise May 25, 2022
4137c52
Merge branch 'planner_refactory' into introduce_physical_plan_add_switch
SeaRise May 25, 2022
328e18f
Merge branch 'introduce_physical_plan_add_switch' of https://github.c…
SeaRise May 25, 2022
e6d0a43
Merge branch 'planner_refactory' into introduce_physical_plan_add_switch
SeaRise May 26, 2022
2fc02b7
add tests
SeaRise May 26, 2022
6a91f86
f
SeaRise May 26, 2022
dcaa84a
Errors::Planner
SeaRise May 31, 2022
8ac8ce4
Merge branch 'planner_refactory' into introduce_physical_plan_add_switch
SeaRise Jun 8, 2022
64b6d2c
add tests and test
SeaRise Jun 8, 2022
ca45abf
add parallel tests
SeaRise Jun 8, 2022
5a95ca0
fix
SeaRise Jun 8, 2022
6f9dc61
format
SeaRise Jun 9, 2022
edf26bc
address comment
SeaRise Jun 9, 2022
d31b310
address comments
SeaRise Jun 10, 2022
472dd1c
tmp save
SeaRise Jun 13, 2022
dd5d89b
Merge branch 'planner_refactory' into new_support_common_plans
SeaRise Jun 13, 2022
6cc1a9c
limit and topn
SeaRise Jun 14, 2022
c0e5e5b
filter
SeaRise Jun 14, 2022
237dd2e
agg
SeaRise Jun 14, 2022
2b85873
ExchangeSender
SeaRise Jun 14, 2022
09d4287
receiver
SeaRise Jun 14, 2022
0c3f6b3
projection
SeaRise Jun 14, 2022
27346c7
fix
SeaRise Jun 15, 2022
9d16d81
fix
SeaRise Jun 15, 2022
508e838
fix
SeaRise Jun 20, 2022
b169bca
add physical plan test
SeaRise Jun 20, 2022
dcc2233
fmt
SeaRise Jun 20, 2022
0a1dd00
fix lience
SeaRise Jun 20, 2022
1bfb173
add gtest_physical_plan
SeaRise Jun 20, 2022
95e832e
fmt
SeaRise Jun 20, 2022
9f908d1
revert
SeaRise Jun 20, 2022
840c9f7
fix executor test
SeaRise Jun 20, 2022
40009eb
format
SeaRise Jun 20, 2022
0408cd2
fix tests
SeaRise Jun 20, 2022
2975694
fix ut
SeaRise Jun 21, 2022
b00a874
fix projection bug
SeaRise Jun 21, 2022
362acb5
format
SeaRise Jun 21, 2022
90f9185
update
SeaRise Jun 21, 2022
47b552a
revert useless change
SeaRise Jun 21, 2022
15aac9c
fix test from https://github.com/pingcap/tiflash/pull/5158#discussion…
SeaRise Jun 21, 2022
bd74ec9
mini refactor
SeaRise Jun 22, 2022
53264e0
address comments
SeaRise Jun 22, 2022
7037098
fix bug of physical agg
SeaRise Jun 23, 2022
130da74
refactor tests
SeaRise Jun 23, 2022
c265c58
update
SeaRise Jun 23, 2022
7ace19d
add project action in cast after agg
SeaRise Jun 24, 2022
8e213b6
address comment
SeaRise Jul 5, 2022
c0eab66
address comments
SeaRise Jul 8, 2022
d3a0cc4
address comment
SeaRise Jul 8, 2022
8776c18
address comment: change PhysicalPlan --> PhysicalPlanNode, PhysicalPl…
SeaRise Jul 8, 2022
564eeea
format
SeaRise Jul 8, 2022
240d6d0
fix license-checker fail like https://github.com/pingcap/tiflash/pull…
SeaRise Jul 8, 2022
dc3069f
Merge branch 'planner_refactory' into new_support_common_plans
ti-chi-bot Jul 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/CoprocessorReader.h>
#include <Flash/Coprocessor/DAGResponseWriter.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Statistics/ConnectionProfileInfo.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -176,21 +177,13 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
, log(Logger::get(name, req_id, executor_id))
, total_rows(0)
{
// generate sample block
ColumnsWithTypeAndName columns;
for (auto & dag_col : remote_reader->getOutputSchema())
{
auto tp = getDataTypeByColumnInfoForComputingLayer(dag_col.second);
ColumnWithTypeAndName col(tp, dag_col.first);
columns.emplace_back(col);
}
for (size_t i = 0; i < source_num; i++)
for (size_t i = 0; i < source_num; ++i)
{
execution_summaries_inited[i].store(false);
}
execution_summaries.resize(source_num);
connection_profile_infos.resize(source_num);
sample_block = Block(columns);
sample_block = Block(getColumnWithTypeAndName(toNamesAndTypes(remote_reader->getOutputSchema())));
}

Block getHeader() const override { return sample_block; }
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Debug/astToExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ std::unordered_map<String, tipb::ScalarFuncSig> func_name_to_sig({
{"cast_decimal_datetime", tipb::ScalarFuncSig::CastDecimalAsTime},
{"cast_time_datetime", tipb::ScalarFuncSig::CastTimeAsTime},
{"cast_string_datetime", tipb::ScalarFuncSig::CastStringAsTime},
{"concat", tipb::ScalarFuncSig::Concat},
{"round_int", tipb::ScalarFuncSig::RoundInt},
{"round_uint", tipb::ScalarFuncSig::RoundInt},
{"round_dec", tipb::ScalarFuncSig::RoundDec},
Expand Down Expand Up @@ -455,6 +456,14 @@ void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr
ft->set_collate(collator_id);
break;
}
case tipb::ScalarFuncSig::Concat:
{
expr->set_sig(it_sig->second);
auto * ft = expr->mutable_field_type();
ft->set_tp(TiDB::TypeString);
ft->set_collate(collator_id);
break;
}
case tipb::ScalarFuncSig::RoundInt:
case tipb::ScalarFuncSig::RoundWithFracInt:
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ target_link_libraries(flash_service dbms)

if (ENABLE_TESTS)
add_subdirectory(Coprocessor/tests)
add_subdirectory(Planner/tests)
add_subdirectory(tests)
endif ()
5 changes: 5 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ std::unordered_map<String, BlockInputStreams> & DAGContext::getProfileStreamsMap
return profile_streams_map;
}

void DAGContext::updateFinalConcurrency(size_t cur_streams_size, size_t streams_upper_limit)
{
final_concurrency = std::min(std::max(final_concurrency, cur_streams_size), streams_upper_limit);
}

void DAGContext::initExecutorIdToJoinIdMap()
{
// only mpp task has join executor
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ class DAGContext
return sql_mode & f;
}

void updateFinalConcurrency(size_t cur_streams_size, size_t streams_upper_limit);

bool isTest() const { return is_test; }
void setColumnsForTest(std::unordered_map<String, ColumnsWithTypeAndName> & columns_for_test_map_) { columns_for_test_map = columns_for_test_map_; }
ColumnsWithTypeAndName columnsForTest(String executor_id);
Expand Down
24 changes: 17 additions & 7 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1124,30 +1124,40 @@ std::pair<bool, BoolVec> DAGExpressionAnalyzer::isCastRequiredForRootFinalProjec
return std::make_pair(need_append_type_cast, std::move(need_append_type_cast_vec));
}

NamesWithAliases DAGExpressionAnalyzer::appendFinalProjectForRootQueryBlock(
ExpressionActionsChain & chain,
NamesWithAliases DAGExpressionAnalyzer::buildFinalProjection(
const ExpressionActionsPtr & actions,
const std::vector<tipb::FieldType> & schema,
const std::vector<Int32> & output_offsets,
const String & column_prefix,
bool keep_session_timezone_info)
{
if (unlikely(output_offsets.empty()))
throw Exception("Root Query block without output_offsets", ErrorCodes::LOGICAL_ERROR);
throw Exception("DAGRequest without output_offsets", ErrorCodes::LOGICAL_ERROR);

bool need_append_timezone_cast = !keep_session_timezone_info && !context.getTimezoneInfo().is_utc_timezone;
auto [need_append_type_cast, need_append_type_cast_vec] = isCastRequiredForRootFinalProjection(schema, output_offsets);
assert(need_append_type_cast_vec.size() == output_offsets.size());

auto & step = initAndGetLastStep(chain);

if (need_append_timezone_cast || need_append_type_cast)
{
// after appendCastForRootFinalProjection, source_columns has been modified.
appendCastForRootFinalProjection(step.actions, schema, output_offsets, need_append_timezone_cast, need_append_type_cast_vec);
appendCastForRootFinalProjection(actions, schema, output_offsets, need_append_timezone_cast, need_append_type_cast_vec);
}

// generate project aliases from source_columns.
NamesWithAliases final_project = genRootFinalProjectAliases(column_prefix, output_offsets);
return genRootFinalProjectAliases(column_prefix, output_offsets);
}

NamesWithAliases DAGExpressionAnalyzer::appendFinalProjectForRootQueryBlock(
ExpressionActionsChain & chain,
const std::vector<tipb::FieldType> & schema,
const std::vector<Int32> & output_offsets,
const String & column_prefix,
bool keep_session_timezone_info)
{
auto & step = initAndGetLastStep(chain);

NamesWithAliases final_project = buildFinalProjection(step.actions, schema, output_offsets, column_prefix, keep_session_timezone_info);

for (const auto & name : final_project)
{
Expand Down
63 changes: 38 additions & 25 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ class DAGExpressionAnalyzer : private boost::noncopyable

const Context & getContext() const { return context; }

void reset(const std::vector<NameAndTypePair> & source_columns_)
{
source_columns = source_columns_;
prepared_sets.clear();
}

const std::vector<NameAndTypePair> & getCurrentInputColumns() const;

DAGPreparedSets & getPreparedSets() { return prepared_sets; }
Expand Down Expand Up @@ -102,6 +108,8 @@ class DAGExpressionAnalyzer : private boost::noncopyable
ExpressionActionsChain & chain,
const String & column_prefix) const;

NamesWithAliases genNonRootFinalProjectAliases(const String & column_prefix) const;

// Generate a project action for root DAGQueryBlock,
// to keep the schema of Block and tidb-schema the same.
NamesWithAliases appendFinalProjectForRootQueryBlock(
Expand All @@ -111,6 +119,13 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const String & column_prefix,
bool keep_session_timezone_info);

NamesWithAliases buildFinalProjection(
const ExpressionActionsPtr & actions,
const std::vector<tipb::FieldType> & schema,
const std::vector<Int32> & output_offsets,
const String & column_prefix,
bool keep_session_timezone_info);

String getActions(
const tipb::Expr & expr,
const ExpressionActionsPtr & actions,
Expand Down Expand Up @@ -153,17 +168,37 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const tipb::Window & window,
const size_t window_columns_start_index);

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
NamesAndTypes buildOrderColumns(
const ExpressionActionsPtr & actions,
const ::google::protobuf::RepeatedPtrField<tipb::ByItem> & order_by);

String buildFilterColumn(
const ExpressionActionsPtr & actions,
const std::vector<const tipb::Expr *> & conditions);

void buildAggFuncs(
const tipb::Aggregation & aggregation,
const ExpressionActionsPtr & actions,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns);

void buildAggGroupBy(
const google::protobuf::RepeatedPtrField<tipb::Expr> & group_by,
const ExpressionActionsPtr & actions,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns,
Names & aggregation_keys,
std::unordered_set<String> & agg_key_set,
bool group_by_collation_sensitive,
TiDB::TiDBCollators & collators);

void appendCastAfterAgg(
const ExpressionActionsPtr & actions,
const tipb::Aggregation & agg);

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
String buildTupleFunctionForGroupConcat(
const tipb::Expr & expr,
SortDescription & sort_desc,
Expand All @@ -187,22 +222,6 @@ class DAGExpressionAnalyzer : private boost::noncopyable
NamesAndTypes & aggregated_columns,
bool empty_input_as_null);

void buildAggFuncs(
const tipb::Aggregation & aggregation,
const ExpressionActionsPtr & actions,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns);

void buildAggGroupBy(
const google::protobuf::RepeatedPtrField<tipb::Expr> & group_by,
const ExpressionActionsPtr & actions,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns,
Names & aggregation_keys,
std::unordered_set<String> & agg_key_set,
bool group_by_collation_sensitive,
TiDB::TiDBCollators & collators);

void fillArgumentDetail(
const ExpressionActionsPtr & actions,
const tipb::Expr & arg,
Expand Down Expand Up @@ -275,12 +294,6 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const ExpressionActionsPtr & actions,
const String & column_name);

String buildFilterColumn(
const ExpressionActionsPtr & actions,
const std::vector<const tipb::Expr *> & conditions);

NamesWithAliases genNonRootFinalProjectAliases(const String & column_prefix) const;

NamesWithAliases genRootFinalProjectAliases(
const String & column_prefix,
const std::vector<Int32> & output_offsets) const;
Expand Down
Loading