Skip to content

Commit

Permalink
Merge branch 'master' into data_race_in_tunnel_receiver_set
Browse files Browse the repository at this point in the history
  • Loading branch information
windtalker authored Aug 18, 2022
2 parents a7d5c5f + d7e4e29 commit 2d9d10b
Show file tree
Hide file tree
Showing 15 changed files with 321 additions and 50 deletions.
59 changes: 37 additions & 22 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,30 @@ String getAggFuncName(
return agg_func_name;
}

/// return `duplicated agg function`->getReturnType if duplicated.
/// return `duplicated Agg/Window function`->getReturnType if duplicated.
/// or not return nullptr.
DataTypePtr findDuplicateAggFunc(
template <typename Descriptions>
DataTypePtr findDuplicateAggWindowFunc(
const String & func_string,
const AggregateDescriptions & aggregate_descriptions)
const Descriptions & descriptions)
{
for (const auto & aggregated : aggregate_descriptions)
for (const auto & description : descriptions)
{
if (aggregated.column_name == func_string)
if (description.column_name == func_string)
{
auto return_type = aggregated.function->getReturnType();
assert(return_type);
return return_type;
if constexpr (std::is_same_v<Descriptions, AggregateDescriptions>)
{
auto return_type = description.function->getReturnType();
assert(return_type);
return return_type;
}
else
{
static_assert(std::is_same_v<Descriptions, WindowFunctionDescriptions>);
auto return_type = description.window_function->getReturnType();
assert(return_type);
return return_type;
}
}
}
return nullptr;
Expand All @@ -121,7 +132,7 @@ void appendAggDescription(
AggregateDescription aggregate;
aggregate.argument_names = arg_names;
String func_string = genFuncString(agg_func_name, aggregate.argument_names, arg_collators);
if (auto duplicated_return_type = findDuplicateAggFunc(func_string, aggregate_descriptions))
if (auto duplicated_return_type = findDuplicateAggWindowFunc(func_string, aggregate_descriptions))
{
// agg function duplicate, don't need to build again.
aggregated_columns.emplace_back(func_string, duplicated_return_type);
Expand Down Expand Up @@ -210,7 +221,7 @@ void DAGExpressionAnalyzer::buildGroupConcat(

String func_string = genFuncString(agg_func_name, aggregate.argument_names, arg_collators);
/// return directly if the agg is duplicated
if (auto duplicated_return_type = findDuplicateAggFunc(func_string, aggregate_descriptions))
if (auto duplicated_return_type = findDuplicateAggWindowFunc(func_string, aggregate_descriptions))
{
aggregated_columns.emplace_back(func_string, duplicated_return_type);
return;
Expand Down Expand Up @@ -448,9 +459,8 @@ void DAGExpressionAnalyzer::appendSourceColumnsToRequireOutput(ExpressionActions
}

// This function will add new window function culumns to source_column
std::tuple<WindowDescription, NamesAndTypes> DAGExpressionAnalyzer::appendWindowColumns(const tipb::Window & window, ExpressionActionsChain::Step & step)
void DAGExpressionAnalyzer::appendWindowColumns(WindowDescription & window_description, const tipb::Window & window, ExpressionActionsChain::Step & step)
{
WindowDescription window_description;
NamesAndTypes window_columns;

if (window.func_desc_size() == 0)
Expand All @@ -472,10 +482,8 @@ std::tuple<WindowDescription, NamesAndTypes> DAGExpressionAnalyzer::appendWindow
}
else if (isWindowFunctionExpr(expr))
{
WindowFunctionDescription window_function_description;
String window_func_name = getWindowFunctionName(expr);
auto child_size = expr.children_size();

Names arg_names;
DataTypes arg_types;
TiDB::TiDBCollators arg_collators;
Expand All @@ -484,11 +492,19 @@ std::tuple<WindowDescription, NamesAndTypes> DAGExpressionAnalyzer::appendWindow
fillArgumentDetail(step.actions, expr.children(i), arg_names, arg_types, arg_collators);
}

String func_string = genFuncString(window_func_name, arg_names, arg_collators);
if (auto duplicated_return_type = findDuplicateAggWindowFunc(func_string, window_description.window_functions_descriptions))
{
// window function duplicate, don't need to build again.
source_columns.emplace_back(func_string, duplicated_return_type);
continue;
}

WindowFunctionDescription window_function_description;
window_function_description.argument_names.resize(child_size);
window_function_description.argument_names = arg_names;
step.required_output.insert(step.required_output.end(), arg_names.begin(), arg_names.end());

String func_string = genFuncString(window_func_name, window_function_description.argument_names, arg_collators);
window_function_description.column_name = func_string;
window_function_description.window_function = WindowFunctionFactory::instance().get(window_func_name, arg_types);
DataTypePtr result_type = window_function_description.window_function->getReturnType();
Expand All @@ -502,7 +518,7 @@ std::tuple<WindowDescription, NamesAndTypes> DAGExpressionAnalyzer::appendWindow
}
}

return {window_description, window_columns};
window_description.add_columns = window_columns;
}

WindowDescription DAGExpressionAnalyzer::buildWindowDescription(const tipb::Window & window)
Expand All @@ -512,18 +528,17 @@ WindowDescription DAGExpressionAnalyzer::buildWindowDescription(const tipb::Wind
appendSourceColumnsToRequireOutput(step);
size_t source_size = getCurrentInputColumns().size();

auto [window_description, window_columns] = appendWindowColumns(window, step);

window_description.add_columns = window_columns;

WindowDescription window_description;
window_description.partition_by = getWindowSortDescription(window.partition_by());
window_description.order_by = getWindowSortDescription(window.order_by());
if (window.has_frame())
{
window_description.setWindowFrame(window.frame());
}

appendWindowColumns(window_description, window, step);

window_description.before_window = chain.getLastActions();
window_description.partition_by = getWindowSortDescription(window.partition_by());
window_description.order_by = getWindowSortDescription(window.order_by());
chain.finalize();
chain.clear();

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const tipb::Aggregation & agg,
bool group_by_collation_sensitive);

std::tuple<WindowDescription, NamesAndTypes> appendWindowColumns(const tipb::Window & window, ExpressionActionsChain::Step & step);
void appendWindowColumns(WindowDescription & window_description, const tipb::Window & window, ExpressionActionsChain::Step & step);

WindowDescription buildWindowDescription(const tipb::Window & window);

Expand Down
27 changes: 27 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,31 @@ ExpressionActionsPtr newActions(const NamesAndTypes & input_columns, const Conte
}
return std::make_shared<ExpressionActions>(actions_input_column, context.getSettingsRef());
}

NamesAndTypes addSchemaProjectAction(
const ExpressionActionsPtr & expr_actions,
const NamesAndTypes & before_schema,
const String & column_prefix)
{
assert(expr_actions);
assert(!before_schema.empty());

NamesAndTypes after_schema = before_schema;
NamesWithAliases project_aliases;
std::unordered_set<String> column_name_set;
for (size_t i = 0; i < before_schema.size(); ++i)
{
const auto & before_column_name = before_schema[i].name;
String after_column_name = column_prefix + before_column_name;
/// Duplicate columns don‘t need to project.
if (column_name_set.find(before_column_name) == column_name_set.end())
{
project_aliases.emplace_back(before_column_name, after_column_name);
column_name_set.emplace(before_column_name);
}
after_schema[i].name = after_column_name;
}
expr_actions->add(ExpressionAction::project(project_aliases));
return after_schema;
}
} // namespace DB::PhysicalPlanHelper
5 changes: 5 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,9 @@ namespace DB::PhysicalPlanHelper
ExpressionActionsPtr newActions(const Block & input_block, const Context & context);

ExpressionActionsPtr newActions(const NamesAndTypes & input_columns, const Context & context);

NamesAndTypes addSchemaProjectAction(
const ExpressionActionsPtr & expr_actions,
const NamesAndTypes & before_schema,
const String & column_prefix = "");
} // namespace DB::PhysicalPlanHelper
3 changes: 1 addition & 2 deletions dbms/src/Flash/Planner/plans/PhysicalAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ PhysicalPlanNodePtr PhysicalAggregation::build(
analyzer.reset(aggregated_columns);
analyzer.appendCastAfterAgg(expr_after_agg_actions, aggregation);
/// project action after aggregation to remove useless columns.
const NamesAndTypes & schema = analyzer.getCurrentInputColumns();
expr_after_agg_actions->add(ExpressionAction::project(DB::toNames(schema)));
auto schema = PhysicalPlanHelper::addSchemaProjectAction(expr_after_agg_actions, analyzer.getCurrentInputColumns());

auto physical_agg = std::make_shared<PhysicalAggregation>(
executor_id,
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ PhysicalPlanNodePtr PhysicalExchangeReceiver::build(

void PhysicalExchangeReceiver::transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams)
{
assert(pipeline.streams.empty() && pipeline.streams_with_non_joined_data.empty());

auto & dag_context = *context.getDAGContext();
// todo choose a more reasonable stream number
auto & exchange_receiver_io_input_streams = dag_context.getInBoundIOInputStreamsMap()[executor_id];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ PhysicalPlanNodePtr PhysicalMockExchangeReceiver::build(

void PhysicalMockExchangeReceiver::transformImpl(DAGPipeline & pipeline, Context & /*context*/, size_t /*max_streams*/)
{
assert(pipeline.streams.empty() && pipeline.streams_with_non_joined_data.empty());
pipeline.streams.insert(pipeline.streams.end(), mock_streams.begin(), mock_streams.end());
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ PhysicalPlanNodePtr PhysicalMockTableScan::build(

void PhysicalMockTableScan::transformImpl(DAGPipeline & pipeline, Context & /*context*/, size_t /*max_streams*/)
{
assert(pipeline.streams.empty() && pipeline.streams_with_non_joined_data.empty());
pipeline.streams.insert(pipeline.streams.end(), mock_streams.begin(), mock_streams.end());
}

Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Flash/Planner/plans/PhysicalWindow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ PhysicalPlanNodePtr PhysicalWindow::build(
WindowDescription window_description = analyzer.buildWindowDescription(window);

/// project action after window to remove useless columns.
const auto & schema = window_description.after_window_columns;
window_description.after_window->add(ExpressionAction::project(DB::toNames(schema)));
/// For window, we need to add column_prefix to distinguish it from the output of the next window.
/// such as `window(row_number()) <-- window(row_number())`.
auto schema = PhysicalPlanHelper::addSchemaProjectAction(
window_description.after_window,
window_description.after_window_columns,
fmt::format("{}_", executor_id));
window_description.after_window_columns = schema;

auto physical_window = std::make_shared<PhysicalWindow>(
executor_id,
Expand Down Expand Up @@ -78,7 +83,7 @@ void PhysicalWindow::transformImpl(DAGPipeline & pipeline, Context & context, si
pipeline.firstStream() = std::make_shared<WindowBlockInputStream>(pipeline.firstStream(), window_description, log->identifier());
}

executeExpression(pipeline, window_description.after_window, log, "cast after window");
executeExpression(pipeline, window_description.after_window, log, "expr after window");
}

void PhysicalWindow::finalize(const Names & parent_require)
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Flash/Planner/tests/gtest_physical_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,13 @@ try
execute(
request,
/*expected_physical_plan=*/R"(
<Projection, window_2> | is_tidb_operator: false, schema: <window_2_partition, Nullable(Int64)>, <window_2_order, Nullable(Int64)>, <window_2_CAST(row_number()_collator , Nullable(Int64)_String)_collator_0 , Nullable(Int64)>
<Window, window_2> | is_tidb_operator: true, schema: <partition, Nullable(Int64)>, <order, Nullable(Int64)>, <CAST(row_number()_collator , Nullable(Int64)_String)_collator_0 , Nullable(Int64)>
<Projection, window_2> | is_tidb_operator: false, schema: <window_2_window_2_partition, Nullable(Int64)>, <window_2_window_2_order, Nullable(Int64)>, <window_2_window_2_CAST(row_number()_collator , Nullable(Int64)_String)_collator_0 , Nullable(Int64)>
<Window, window_2> | is_tidb_operator: true, schema: <window_2_partition, Nullable(Int64)>, <window_2_order, Nullable(Int64)>, <window_2_CAST(row_number()_collator , Nullable(Int64)_String)_collator_0 , Nullable(Int64)>
<WindowSort, sort_1> | is_tidb_operator: true, schema: <partition, Nullable(Int64)>, <order, Nullable(Int64)>
<MockExchangeReceiver, exchange_receiver_0> | is_tidb_operator: true, schema: <partition, Nullable(Int64)>, <order, Nullable(Int64)>)",
/*expected_streams=*/R"(
Expression: <final projection>
Expression: <cast after window>
Expression: <expr after window>
Window, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current}
MergeSorting, limit = 0
PartialSorting: limit = 0
Expand All @@ -314,13 +314,13 @@ Expression: <final projection>
execute(
request,
/*expected_physical_plan=*/R"(
<Projection, window_2> | is_tidb_operator: false, schema: <window_2_partition, Nullable(Int64)>, <window_2_order, Nullable(Int64)>, <window_2_CAST(row_number()_collator , Nullable(Int64)_String)_collator_0 , Nullable(Int64)>
<Window, window_2> | is_tidb_operator: true, schema: <partition, Nullable(Int64)>, <order, Nullable(Int64)>, <CAST(row_number()_collator , Nullable(Int64)_String)_collator_0 , Nullable(Int64)>
<Projection, window_2> | is_tidb_operator: false, schema: <window_2_window_2_partition, Nullable(Int64)>, <window_2_window_2_order, Nullable(Int64)>, <window_2_window_2_CAST(row_number()_collator , Nullable(Int64)_String)_collator_0 , Nullable(Int64)>
<Window, window_2> | is_tidb_operator: true, schema: <window_2_partition, Nullable(Int64)>, <window_2_order, Nullable(Int64)>, <window_2_CAST(row_number()_collator , Nullable(Int64)_String)_collator_0 , Nullable(Int64)>
<WindowSort, sort_1> | is_tidb_operator: true, schema: <partition, Nullable(Int64)>, <order, Nullable(Int64)>
<MockExchangeReceiver, exchange_receiver_0> | is_tidb_operator: true, schema: <partition, Nullable(Int64)>, <order, Nullable(Int64)>)",
/*expected_streams=*/R"(
Expression: <final projection>
Expression: <cast after window>
Expression: <expr after window>
Window: <enable fine grained shuffle>, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current}
MergeSorting: <enable fine grained shuffle>, limit = 0
PartialSorting: <enable fine grained shuffle>: limit = 0
Expand Down
37 changes: 29 additions & 8 deletions dbms/src/Flash/tests/gtest_aggregation_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ class ExecutorAggTestRunner : public DB::tests::ExecutorTest
{{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}},
{toNullableVec<String>("s1", {"banana", {}, "banana"}),
toNullableVec<String>("s2", {"apple", {}, "banana"})});

context.addMockTable({"test_db", "test_table"},
{{"s1", TiDB::TP::TypeLongLong}, {"s2", TiDB::TP::TypeLongLong}},
{toVec<Int64>("s1", {1, 2, 3}),
toVec<Int64>("s2", {1, 2, 3})});
}

std::shared_ptr<tipb::DAGRequest> buildDAGRequest(std::pair<String, String> src, MockAstVec agg_funcs, MockAstVec group_by_exprs, MockColumnNameVec proj)
Expand Down Expand Up @@ -317,22 +322,38 @@ try
.scan("aggnull_test", "t1")
.aggregation({Max(col("s1"))}, {})
.build(context);
{
ASSERT_COLUMNS_EQ_R(executeStreams(request),
createColumns({toNullableVec<String>({"banana"})}));
}
executeWithConcurrency(request, {{toNullableVec<String>({"banana"})}});

request = context
.scan("aggnull_test", "t1")
.aggregation({}, {col("s1")})
.build(context);
{
ASSERT_COLUMNS_EQ_UR(executeStreams(request),
createColumns({toNullableVec<String>("s1", {{}, "banana"})}));
}
executeWithConcurrency(request, {{toNullableVec<String>("s1", {{}, "banana"})}});
}
CATCH

TEST_F(ExecutorAggTestRunner, RepeatedAggregateFunction)
try
{
/// select max(s1), max(s1) from test_db.test_table;
auto request = context
.scan("test_db", "test_table")
.aggregation({Max(col("s1")), Max(col("s1"))}, {})
.build(context);
executeWithConcurrency(
request,
{{toNullableVec<Int64>({3})}, {toNullableVec<Int64>({3})}});

/// select max(s1), max(s1), sum(s2) from test_db.test_table;
request = context
.scan("test_db", "test_table")
.aggregation({Max(col("s1")), Max(col("s1")), Sum(col("s2"))}, {})
.build(context);
executeWithConcurrency(
request,
{{toNullableVec<Int64>({3})}, {toNullableVec<Int64>({3})}, {toVec<UInt64>({6})}});
}
CATCH

// TODO support more type of min, max, count.
// support more aggregation functions: sum, forst_row, group_concat
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Flash/tests/gtest_planner_interpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ try
Union: <for test>
Expression x 10: <final projection>
SharedQuery: <restore concurrency>
Expression: <cast after window>
Expression: <expr after window>
Window, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current}
MergeSorting, limit = 0
Union: <for partial order>
Expand All @@ -634,7 +634,7 @@ Union: <for test>
Expression x 10: <final projection>
Expression: <projection>
SharedQuery: <restore concurrency>
Expression: <cast after window>
Expression: <expr after window>
Window, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current}
MergeSorting, limit = 0
Union: <for partial order>
Expand All @@ -655,7 +655,7 @@ Union: <for test>
Expression x 10: <final projection>
Expression: <projection>
SharedQuery: <restore concurrency>
Expression: <cast after window>
Expression: <expr after window>
Window, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current}
Union: <merge into one for window input>
Expression x 10: <projection>
Expand Down Expand Up @@ -684,7 +684,7 @@ try
String expected = R"(
Union: <for test>
Expression x 10: <final projection>
Expression: <cast after window>
Expression: <expr after window>
Window: <enable fine grained shuffle>, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current}
MergeSorting: <enable fine grained shuffle>, limit = 0
PartialSorting: <enable fine grained shuffle>: limit = 0
Expand Down Expand Up @@ -719,7 +719,7 @@ Union: <for test>
Union: <for test>
Expression x 10: <final projection>
SharedQuery: <restore concurrency>
Expression: <cast after window>
Expression: <expr after window>
Window, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current}
MergeSorting, limit = 0
Union: <for partial order>
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/tests/gtest_projection_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ class ExecutorProjectionTestRunner : public DB::tests::ExecutorTest
return context.scan(db_name, table_name).project(param).build(context);
};

static const size_t max_concurrency_level = 10;

void executeWithConcurrency(const std::shared_ptr<tipb::DAGRequest> & request, const ColumnsWithTypeAndName & expect_columns)
{
WRAP_FOR_DIS_ENABLE_PLANNER_BEGIN
for (size_t i = 1; i < 10; i += 2)
for (size_t i = 1; i <= max_concurrency_level; i += 2)
{
ASSERT_COLUMNS_EQ_UR(executeStreams(request, i), expect_columns);
}
Expand Down
Loading

0 comments on commit 2d9d10b

Please sign in to comment.