Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline: support topn #6805

Merged
merged 12 commits into from
Feb 15, 2023
1 change: 1 addition & 0 deletions dbms/src/Flash/Pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ bool Pipeline::isSupported(const tipb::DAGRequest & dag_request)
case tipb::ExecType::TypeProjection:
case tipb::ExecType::TypeSelection:
case tipb::ExecType::TypeLimit:
case tipb::ExecType::TypeTopN:
// Only support mock table_scan/exchange_sender/exchange_receiver in test mode now.
case tipb::ExecType::TypeTableScan:
case tipb::ExecType::TypeExchangeSender:
Expand Down
16 changes: 16 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalTopN.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Flash/Planner/FinalizeHelper.h>
#include <Flash/Planner/PhysicalPlanHelper.h>
#include <Flash/Planner/Plans/PhysicalTopN.h>
#include <Interpreters/Context.h>
#include <Operators/ExpressionTransformOp.h>
#include <Operators/TopNTransformOp.h>

namespace DB
{
Expand Down Expand Up @@ -65,6 +68,19 @@ void PhysicalTopN::buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & c
orderStreams(pipeline, max_streams, order_descr, limit, false, context, log);
}

void PhysicalTopN::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & context, size_t /*concurrency*/)
{
if (!before_sort_actions->getActions().empty())
{
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<ExpressionTransformOp>(group_builder.exec_status, before_sort_actions, log->identifier()));
});
}
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<TopNTransformOp>(group_builder.exec_status, order_descr, limit, context.getSettingsRef().max_block_size, log->identifier()));
});
}

void PhysicalTopN::finalize(const Names & parent_require)
{
Names required_output = parent_require;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalTopN.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class PhysicalTopN : public PhysicalUnary

const Block & getSampleBlock() const override;

void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & context, size_t concurrency) override;

private:
void buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override;

Expand Down
70 changes: 8 additions & 62 deletions dbms/src/Flash/tests/gtest_pipeline_interpreter.out
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,7 @@ pipeline#0: MockTableScan|table_scan_0 -> Limit|limit_1 -> Limit|limit_2 -> Limi
~test_suite_name: StrangeQuery
~result_index: 2
~result:
Union: <for test>
Expression x 10: <final projection>
SharedQuery: <restore concurrency>
MergeSorting, limit = 8
Union: <for partial order>
PartialSorting x 10: limit = 8
SharedQuery: <restore concurrency>
MergeSorting, limit = 9
Union: <for partial order>
PartialSorting x 10: limit = 9
SharedQuery: <restore concurrency>
MergeSorting, limit = 10
Union: <for partial order>
PartialSorting x 10: limit = 10
MockTableScan
pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> TopN|topn_2 -> TopN|topn_3 -> Projection|NonTiDBOperator
@
~test_suite_name: SingleQueryBlock
~result_index: 0
Expand Down Expand Up @@ -103,21 +89,12 @@ Union: <for test>
~test_suite_name: ParallelQuery
~result_index: 6
~result:
Expression: <final projection>
MergeSorting, limit = 10
PartialSorting: limit = 10
MockTableScan
pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator
@
~test_suite_name: ParallelQuery
~result_index: 7
~result:
Union: <for test>
Expression x 5: <final projection>
SharedQuery: <restore concurrency>
MergeSorting, limit = 10
Union: <for partial order>
PartialSorting x 5: limit = 10
MockTableScan
pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator
@
~test_suite_name: ParallelQuery
~result_index: 8
Expand Down Expand Up @@ -224,23 +201,12 @@ MockExchangeSender
~test_suite_name: ParallelQuery
~result_index: 18
~result:
Union: <for test>
MockExchangeSender x 10
Expression: <final projection>
SharedQuery: <restore concurrency>
MergeSorting, limit = 10
Union: <for partial order>
PartialSorting x 10: limit = 10
MockTableScan
pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2
@
~test_suite_name: ParallelQuery
~result_index: 19
~result:
MockExchangeSender
Expression: <final projection>
MergeSorting, limit = 10
PartialSorting: limit = 10
MockTableScan
pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2
@
~test_suite_name: ParallelQuery
~result_index: 20
Expand Down Expand Up @@ -280,15 +246,7 @@ pipeline#0: MockTableScan|table_scan_0 -> Projection|project_1 -> Projection|pro
~test_suite_name: MultipleQueryBlockWithSource
~result_index: 1
~result:
Union: <for test>
Expression x 10: <final projection>
Expression: <projection>
SharedQuery: <restore concurrency>
MergeSorting, limit = 10
Union: <for partial order>
PartialSorting x 10: limit = 10
Expression: <projection>
MockTableScan
pipeline#0: MockTableScan|table_scan_0 -> Projection|project_1 -> TopN|topn_2 -> Projection|project_3 -> Projection|NonTiDBOperator
@
~test_suite_name: MultipleQueryBlockWithSource
~result_index: 2
Expand Down Expand Up @@ -393,13 +351,7 @@ Union: <for test>
~test_suite_name: FineGrainedShuffle
~result_index: 1
~result:
Union: <for test>
Expression x 10: <final projection>
SharedQuery: <restore concurrency>
MergeSorting, limit = 10
Union: <for partial order>
PartialSorting x 10: limit = 10
MockExchangeReceiver
pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> TopN|topn_1 -> Projection|NonTiDBOperator
@
~test_suite_name: FineGrainedShuffle
~result_index: 2
Expand All @@ -417,13 +369,7 @@ Union: <for test>
~test_suite_name: FineGrainedShuffle
~result_index: 3
~result:
Union: <for test>
Expression x 10: <final projection>
SharedQuery: <restore concurrency>
MergeSorting, limit = 10
Union: <for partial order>
PartialSorting x 10: limit = 10
MockExchangeReceiver
pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> TopN|topn_1 -> Projection|NonTiDBOperator
@
~test_suite_name: FineGrainedShuffleJoin
~result_index: 0
Expand Down
140 changes: 66 additions & 74 deletions dbms/src/Flash/tests/gtest_topn_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ class TopNExecutorTestRunner : public DB::tests::ExecutorTest
toNullableVec<String>(col_name[1], col_gender),
toNullableVec<String>(col_name[2], col_country),
toNullableVec<Int32>(col_name[3], col_salary)});
context.addMockTable({db_name, empty_name},
{{col_name[0], TiDB::TP::TypeLong},
{col_name[1], TiDB::TP::TypeString},
{col_name[2], TiDB::TP::TypeString},
{col_name[3], TiDB::TP::TypeLong}},
{toNullableVec<Int32>(col_name[0], {}),
toNullableVec<String>(col_name[1], {}),
toNullableVec<String>(col_name[2], {}),
toNullableVec<Int32>(col_name[3], {})});

/// table with 200 rows
{
Expand Down Expand Up @@ -79,7 +88,7 @@ class TopNExecutorTestRunner : public DB::tests::ExecutorTest
return context.scan(db_name, table_name).topN(col_name, is_desc, limit_num).build(context);
}

std::shared_ptr<tipb::DAGRequest> buildDAGRequest(const String & table_name, MockOrderByItemVec order_by_items, int limit, MockAstVec func_proj_ast = {}, MockColumnNameVec out_proj_ast = {})
std::shared_ptr<tipb::DAGRequest> buildDAGRequest(const String & table_name, MockOrderByItemVec order_by_items, int limit, MockAstVec func_proj_ast = {}, MockAstVec out_proj_ast = {})
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
{
if (func_proj_ast.empty())
return context.scan(db_name, table_name).topN(order_by_items, limit).build(context);
Expand All @@ -100,6 +109,9 @@ class TopNExecutorTestRunner : public DB::tests::ExecutorTest
ColumnWithNullableString col_gender{"female", "female", "male", "female", "male", "male"};
ColumnWithNullableString col_country{"korea", "usa", "usa", "china", "china", "china"};
ColumnWithNullableInt32 col_salary{1300, 0, {}, 900, {}, -300};

// empty table
const String empty_name{"empty_table"};
};

TEST_F(TopNExecutorTestRunner, TopN)
Expand All @@ -113,43 +125,18 @@ try
size_t col_data_num = col0.size();
for (size_t i = 1; i <= 1; ++i)
{
bool is_desc;
is_desc = static_cast<bool>(i); /// Set descent or ascent
if (is_desc)
sort(col0.begin(), col0.end(), std::greater<ColStringNullableType>()); /// Sort col0 for the following comparison
else
sort(col0.begin(), col0.end());

bool is_desc = static_cast<bool>(i); /// Set descent or ascent
for (size_t limit_num = 0; limit_num <= col_data_num + 5; ++limit_num)
{
request = buildDAGRequest(table_single_name, single_col_name, is_desc, limit_num);

expect_cols.clear();
if (limit_num == 0 || limit_num > col_data_num)
expect_cols.push_back({toNullableVec<String>(single_col_name, ColumnWithNullableString(col0.begin(), col0.end()))});
else
expect_cols.push_back({toNullableVec<String>(single_col_name, ColumnWithNullableString(col0.begin(), col0.begin() + limit_num))});

executeAndAssertColumnsEqual(request, expect_cols.back());
SortInfos sort_infos{{0, is_desc}};
executeAndAssertSortedBlocks(request, sort_infos);
}
}
}

{
/// Test multi-columns
expect_cols = {{toNullableVec<Int32>(col_name[0], ColumnWithNullableInt32{36, 34, 32, 27, {}, {}}),
toNullableVec<String>(col_name[1], ColumnWithNullableString{"female", "male", "male", "female", "male", "female"}),
toNullableVec<String>(col_name[2], ColumnWithNullableString{"china", "china", "usa", "usa", "china", "korea"}),
toNullableVec<Int32>(col_name[3], ColumnWithNullableInt32{900, -300, {}, 0, {}, 1300})},
{toNullableVec<Int32>(col_name[0], ColumnWithNullableInt32{32, {}, 34, 27, 36, {}}),
toNullableVec<String>(col_name[1], ColumnWithNullableString{"male", "male", "male", "female", "female", "female"}),
toNullableVec<String>(col_name[2], ColumnWithNullableString{"usa", "china", "china", "usa", "china", "korea"}),
toNullableVec<Int32>(col_name[3], ColumnWithNullableInt32{{}, {}, -300, 0, 900, 1300})},
{toNullableVec<Int32>(col_name[0], ColumnWithNullableInt32{34, {}, 32, 36, {}, 27}),
toNullableVec<String>(col_name[1], ColumnWithNullableString{"male", "male", "male", "female", "female", "female"}),
toNullableVec<String>(col_name[2], ColumnWithNullableString{"china", "china", "usa", "china", "korea", "usa"}),
toNullableVec<Int32>(col_name[3], ColumnWithNullableInt32{-300, {}, {}, 900, 1300, 0})}};

std::vector<MockOrderByItemVec> order_by_items{
/// select * from clerk order by age DESC, gender DESC;
{MockOrderByItem(col_name[0], true), MockOrderByItem(col_name[1], true)},
Expand All @@ -158,12 +145,15 @@ try
/// select * from clerk order by gender DESC, country ASC, salary DESC;
{MockOrderByItem(col_name[1], true), MockOrderByItem(col_name[2], false), MockOrderByItem(col_name[3], true)}};

size_t test_num = expect_cols.size();
std::vector<SortInfos> infos{
{{0, true}, {1, true}},
{{1, true}, {3, false}},
{{1, true}, {2, false}, {3, true}}};

for (size_t i = 0; i < test_num; ++i)
for (size_t i = 0; i < order_by_items.size(); ++i)
{
request = buildDAGRequest(table_name, order_by_items[i], 100);
executeAndAssertColumnsEqual(request, expect_cols[i]);
executeAndAssertSortedBlocks(request, infos[i]);
}
}
}
Expand All @@ -173,8 +163,11 @@ TEST_F(TopNExecutorTestRunner, TopNFunction)
try
{
std::shared_ptr<tipb::DAGRequest> request;
std::vector<ColumnsWithTypeAndName> expect_cols;
MockColumnNameVec output_projection{col_name[0], col_name[1], col_name[2], col_name[3]};
std::vector<MockAstVec> output_projections{
{col(col_name[0]), col(col_name[1]), col(col_name[2]), col(col_name[3]), And(col("age"), col("salary"))},
{col(col_name[0]), col(col_name[1]), col(col_name[2]), col(col_name[3]), eq(col("age"), col("salary"))},
{col(col_name[0]), col(col_name[1]), col(col_name[2]), col(col_name[3]), gt(col("age"), col("salary"))}};

MockAstVec func_projection; // Do function operation for topn
MockOrderByItemVec order_by_items;
ASTPtr col0_ast = col(col_name[0]);
Expand All @@ -183,57 +176,41 @@ try
ASTPtr col3_ast = col(col_name[3]);
ASTPtr func_ast;

/// "and" function
{
/// "and" function
expect_cols = {{toNullableVec<Int32>(col_name[0], ColumnWithNullableInt32{{}, {}, 32, 27, 36, 34}),
toNullableVec<String>(col_name[1], ColumnWithNullableString{"female", "male", "male", "female", "female", "male"}),
toNullableVec<String>(col_name[2], ColumnWithNullableString{"korea", "china", "usa", "usa", "china", "china"}),
toNullableVec<Int32>(col_name[3], ColumnWithNullableInt32{1300, {}, {}, 0, 900, -300})}};

{
/// select * from clerk order by age and salary ASC limit 100;
order_by_items = {MockOrderByItem("and(age, salary)", false)};
func_ast = And(col(col_name[0]), col(col_name[3]));
func_projection = {col0_ast, col1_ast, col2_ast, col3_ast, func_ast};

request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projection);
executeAndAssertColumnsEqual(request, expect_cols.back());
}
/// select * from clerk order by age and salary ASC limit 100;
order_by_items = {MockOrderByItem("and(age, salary)", false)};
func_ast = And(col(col_name[0]), col(col_name[3]));
func_projection = {col0_ast, col1_ast, col2_ast, col3_ast, func_ast};
request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projections[0]);
SortInfos sort_infos{{4, false}};
executeAndAssertSortedBlocks(request, sort_infos);
}

{
/// "equal" function
expect_cols = {{toNullableVec<Int32>(col_name[0], ColumnWithNullableInt32{27, 36, 34, 32, {}, {}}),
toNullableVec<String>(col_name[1], ColumnWithNullableString{"female", "female", "male", "male", "female", "male"}),
toNullableVec<String>(col_name[2], ColumnWithNullableString{"usa", "china", "china", "usa", "korea", "china"}),
toNullableVec<Int32>(col_name[3], ColumnWithNullableInt32{0, 900, -300, {}, 1300, {}})}};

{
/// select age, salary from clerk order by age = salary DESC limit 100;
order_by_items = {MockOrderByItem("equals(age, salary)", true)};
func_ast = eq(col(col_name[0]), col(col_name[3]));
func_projection = {col0_ast, col1_ast, col2_ast, col3_ast, func_ast};

request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projection);
executeAndAssertColumnsEqual(request, expect_cols.back());
}
/// "equal" function
{
/// select age, salary from clerk order by age = salary DESC limit 100;
order_by_items = {MockOrderByItem("equals(age, salary)", true)};
func_ast = eq(col(col_name[0]), col(col_name[3]));
func_projection = {col0_ast, col1_ast, col2_ast, col3_ast, func_ast};

request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projections[1]);
SortInfos sort_infos{{4, true}};
executeAndAssertSortedBlocks(request, sort_infos);
}

{
/// "greater" function
expect_cols = {{toNullableVec<Int32>(col_name[0], ColumnWithNullableInt32{{}, 32, {}, 36, 27, 34}),
toNullableVec<String>(col_name[1], ColumnWithNullableString{"female", "male", "male", "female", "female", "male"}),
toNullableVec<String>(col_name[2], ColumnWithNullableString{"korea", "usa", "china", "china", "usa", "china"}),
toNullableVec<Int32>(col_name[3], ColumnWithNullableInt32{1300, {}, {}, 900, 0, -300})}};

{
/// select age, gender, country, salary from clerk order by age > salary ASC limit 100;
order_by_items = {MockOrderByItem("greater(age, salary)", false)};
func_ast = gt(col(col_name[0]), col(col_name[3]));
func_projection = {col0_ast, col1_ast, col2_ast, col3_ast, func_ast};

request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projection);
executeAndAssertColumnsEqual(request, expect_cols.back());
request = buildDAGRequest(table_name, order_by_items, 100, func_projection, output_projections[2]);
SortInfos sort_infos{{4, false}};
executeAndAssertSortedBlocks(request, sort_infos);
}
}

Expand All @@ -251,15 +228,30 @@ try
for (auto limit_num : limits)
{
auto request = context
.scan("test_db", table)
.scan(db_name, table)
.topN("key", false, limit_num)
.build(context);
auto expect = executeStreams(request, 1);
executeAndAssertColumnsEqual(request, expect);
SortInfos sort_infos{{0, false}};
executeAndAssertSortedBlocks(request, sort_infos);
}
}
}
CATCH

TEST_F(TopNExecutorTestRunner, Empty)
try
{
for (size_t i = 0; i < col_name.size(); ++i)
{
auto request = context
.scan(db_name, empty_name)
.topN(col_name[i], false, 100)
.build(context);
SortInfos sort_infos{{i, false}};
executeAndAssertSortedBlocks(request, sort_infos);
}
}
CATCH

} // namespace tests
} // namespace DB
Loading