diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp index 5fbd86e9762..0d7d711846d 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp @@ -87,19 +87,30 @@ String getAggFuncName( return agg_func_name; } -/// return `duplicated agg function`->getReturnType if duplicated. +/// return `duplicated Agg/Window function`->getReturnType if duplicated. /// or not return nullptr. -DataTypePtr findDuplicateAggFunc( +template +DataTypePtr findDuplicateAggWindowFunc( const String & func_string, - const AggregateDescriptions & aggregate_descriptions) + const Descriptions & descriptions) { - for (const auto & aggregated : aggregate_descriptions) + for (const auto & description : descriptions) { - if (aggregated.column_name == func_string) + if (description.column_name == func_string) { - auto return_type = aggregated.function->getReturnType(); - assert(return_type); - return return_type; + if constexpr (std::is_same_v) + { + auto return_type = description.function->getReturnType(); + assert(return_type); + return return_type; + } + else + { + static_assert(std::is_same_v); + auto return_type = description.window_function->getReturnType(); + assert(return_type); + return return_type; + } } } return nullptr; @@ -121,7 +132,7 @@ void appendAggDescription( AggregateDescription aggregate; aggregate.argument_names = arg_names; String func_string = genFuncString(agg_func_name, aggregate.argument_names, arg_collators); - if (auto duplicated_return_type = findDuplicateAggFunc(func_string, aggregate_descriptions)) + if (auto duplicated_return_type = findDuplicateAggWindowFunc(func_string, aggregate_descriptions)) { // agg function duplicate, don't need to build again. aggregated_columns.emplace_back(func_string, duplicated_return_type); @@ -210,7 +221,7 @@ void DAGExpressionAnalyzer::buildGroupConcat( String func_string = genFuncString(agg_func_name, aggregate.argument_names, arg_collators); /// return directly if the agg is duplicated - if (auto duplicated_return_type = findDuplicateAggFunc(func_string, aggregate_descriptions)) + if (auto duplicated_return_type = findDuplicateAggWindowFunc(func_string, aggregate_descriptions)) { aggregated_columns.emplace_back(func_string, duplicated_return_type); return; @@ -448,9 +459,8 @@ void DAGExpressionAnalyzer::appendSourceColumnsToRequireOutput(ExpressionActions } // This function will add new window function culumns to source_column -std::tuple DAGExpressionAnalyzer::appendWindowColumns(const tipb::Window & window, ExpressionActionsChain::Step & step) +void DAGExpressionAnalyzer::appendWindowColumns(WindowDescription & window_description, const tipb::Window & window, ExpressionActionsChain::Step & step) { - WindowDescription window_description; NamesAndTypes window_columns; if (window.func_desc_size() == 0) @@ -472,10 +482,8 @@ std::tuple DAGExpressionAnalyzer::appendWindow } else if (isWindowFunctionExpr(expr)) { - WindowFunctionDescription window_function_description; String window_func_name = getWindowFunctionName(expr); auto child_size = expr.children_size(); - Names arg_names; DataTypes arg_types; TiDB::TiDBCollators arg_collators; @@ -484,11 +492,19 @@ std::tuple DAGExpressionAnalyzer::appendWindow fillArgumentDetail(step.actions, expr.children(i), arg_names, arg_types, arg_collators); } + String func_string = genFuncString(window_func_name, arg_names, arg_collators); + if (auto duplicated_return_type = findDuplicateAggWindowFunc(func_string, window_description.window_functions_descriptions)) + { + // window function duplicate, don't need to build again. + source_columns.emplace_back(func_string, duplicated_return_type); + continue; + } + + WindowFunctionDescription window_function_description; window_function_description.argument_names.resize(child_size); window_function_description.argument_names = arg_names; step.required_output.insert(step.required_output.end(), arg_names.begin(), arg_names.end()); - String func_string = genFuncString(window_func_name, window_function_description.argument_names, arg_collators); window_function_description.column_name = func_string; window_function_description.window_function = WindowFunctionFactory::instance().get(window_func_name, arg_types); DataTypePtr result_type = window_function_description.window_function->getReturnType(); @@ -502,7 +518,7 @@ std::tuple DAGExpressionAnalyzer::appendWindow } } - return {window_description, window_columns}; + window_description.add_columns = window_columns; } WindowDescription DAGExpressionAnalyzer::buildWindowDescription(const tipb::Window & window) @@ -512,18 +528,17 @@ WindowDescription DAGExpressionAnalyzer::buildWindowDescription(const tipb::Wind appendSourceColumnsToRequireOutput(step); size_t source_size = getCurrentInputColumns().size(); - auto [window_description, window_columns] = appendWindowColumns(window, step); - - window_description.add_columns = window_columns; - + WindowDescription window_description; + window_description.partition_by = getWindowSortDescription(window.partition_by()); + window_description.order_by = getWindowSortDescription(window.order_by()); if (window.has_frame()) { window_description.setWindowFrame(window.frame()); } + appendWindowColumns(window_description, window, step); + window_description.before_window = chain.getLastActions(); - window_description.partition_by = getWindowSortDescription(window.partition_by()); - window_description.order_by = getWindowSortDescription(window.order_by()); chain.finalize(); chain.clear(); diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h index c42312b95c3..f3fd429b104 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h @@ -83,7 +83,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable const tipb::Aggregation & agg, bool group_by_collation_sensitive); - std::tuple appendWindowColumns(const tipb::Window & window, ExpressionActionsChain::Step & step); + void appendWindowColumns(WindowDescription & window_description, const tipb::Window & window, ExpressionActionsChain::Step & step); WindowDescription buildWindowDescription(const tipb::Window & window); diff --git a/dbms/src/Flash/Planner/PhysicalPlanHelper.cpp b/dbms/src/Flash/Planner/PhysicalPlanHelper.cpp index 923743cdae8..58a6c0e45e3 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanHelper.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlanHelper.cpp @@ -36,4 +36,31 @@ ExpressionActionsPtr newActions(const NamesAndTypes & input_columns, const Conte } return std::make_shared(actions_input_column, context.getSettingsRef()); } + +NamesAndTypes addSchemaProjectAction( + const ExpressionActionsPtr & expr_actions, + const NamesAndTypes & before_schema, + const String & column_prefix) +{ + assert(expr_actions); + assert(!before_schema.empty()); + + NamesAndTypes after_schema = before_schema; + NamesWithAliases project_aliases; + std::unordered_set column_name_set; + for (size_t i = 0; i < before_schema.size(); ++i) + { + const auto & before_column_name = before_schema[i].name; + String after_column_name = column_prefix + before_column_name; + /// Duplicate columns don‘t need to project. + if (column_name_set.find(before_column_name) == column_name_set.end()) + { + project_aliases.emplace_back(before_column_name, after_column_name); + column_name_set.emplace(before_column_name); + } + after_schema[i].name = after_column_name; + } + expr_actions->add(ExpressionAction::project(project_aliases)); + return after_schema; +} } // namespace DB::PhysicalPlanHelper diff --git a/dbms/src/Flash/Planner/PhysicalPlanHelper.h b/dbms/src/Flash/Planner/PhysicalPlanHelper.h index 89c1963feaa..9fdb8b9b3c4 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanHelper.h +++ b/dbms/src/Flash/Planner/PhysicalPlanHelper.h @@ -22,4 +22,9 @@ namespace DB::PhysicalPlanHelper ExpressionActionsPtr newActions(const Block & input_block, const Context & context); ExpressionActionsPtr newActions(const NamesAndTypes & input_columns, const Context & context); + +NamesAndTypes addSchemaProjectAction( + const ExpressionActionsPtr & expr_actions, + const NamesAndTypes & before_schema, + const String & column_prefix = ""); } // namespace DB::PhysicalPlanHelper diff --git a/dbms/src/Flash/Planner/plans/PhysicalAggregation.cpp b/dbms/src/Flash/Planner/plans/PhysicalAggregation.cpp index 58c79f43951..f40d8f18aa1 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalAggregation.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalAggregation.cpp @@ -69,8 +69,7 @@ PhysicalPlanNodePtr PhysicalAggregation::build( analyzer.reset(aggregated_columns); analyzer.appendCastAfterAgg(expr_after_agg_actions, aggregation); /// project action after aggregation to remove useless columns. - const NamesAndTypes & schema = analyzer.getCurrentInputColumns(); - expr_after_agg_actions->add(ExpressionAction::project(DB::toNames(schema))); + auto schema = PhysicalPlanHelper::addSchemaProjectAction(expr_after_agg_actions, analyzer.getCurrentInputColumns()); auto physical_agg = std::make_shared( executor_id, diff --git a/dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp b/dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp index c1a7fc2727f..03fd2804fb3 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp @@ -62,6 +62,8 @@ PhysicalPlanNodePtr PhysicalExchangeReceiver::build( void PhysicalExchangeReceiver::transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) { + assert(pipeline.streams.empty() && pipeline.streams_with_non_joined_data.empty()); + auto & dag_context = *context.getDAGContext(); // todo choose a more reasonable stream number auto & exchange_receiver_io_input_streams = dag_context.getInBoundIOInputStreamsMap()[executor_id]; diff --git a/dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.cpp b/dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.cpp index 1a990a6a2d4..7a4566eefd0 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.cpp @@ -93,6 +93,7 @@ PhysicalPlanNodePtr PhysicalMockExchangeReceiver::build( void PhysicalMockExchangeReceiver::transformImpl(DAGPipeline & pipeline, Context & /*context*/, size_t /*max_streams*/) { + assert(pipeline.streams.empty() && pipeline.streams_with_non_joined_data.empty()); pipeline.streams.insert(pipeline.streams.end(), mock_streams.begin(), mock_streams.end()); } diff --git a/dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp b/dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp index a47c5895a6a..d390d6c5e05 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp @@ -93,6 +93,7 @@ PhysicalPlanNodePtr PhysicalMockTableScan::build( void PhysicalMockTableScan::transformImpl(DAGPipeline & pipeline, Context & /*context*/, size_t /*max_streams*/) { + assert(pipeline.streams.empty() && pipeline.streams_with_non_joined_data.empty()); pipeline.streams.insert(pipeline.streams.end(), mock_streams.begin(), mock_streams.end()); } diff --git a/dbms/src/Flash/Planner/plans/PhysicalWindow.cpp b/dbms/src/Flash/Planner/plans/PhysicalWindow.cpp index c3dfc48faaf..f6a45ba4d9b 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalWindow.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalWindow.cpp @@ -43,8 +43,13 @@ PhysicalPlanNodePtr PhysicalWindow::build( WindowDescription window_description = analyzer.buildWindowDescription(window); /// project action after window to remove useless columns. - const auto & schema = window_description.after_window_columns; - window_description.after_window->add(ExpressionAction::project(DB::toNames(schema))); + /// For window, we need to add column_prefix to distinguish it from the output of the next window. + /// such as `window(row_number()) <-- window(row_number())`. + auto schema = PhysicalPlanHelper::addSchemaProjectAction( + window_description.after_window, + window_description.after_window_columns, + fmt::format("{}_", executor_id)); + window_description.after_window_columns = schema; auto physical_window = std::make_shared( executor_id, @@ -78,7 +83,7 @@ void PhysicalWindow::transformImpl(DAGPipeline & pipeline, Context & context, si pipeline.firstStream() = std::make_shared(pipeline.firstStream(), window_description, log->identifier()); } - executeExpression(pipeline, window_description.after_window, log, "cast after window"); + executeExpression(pipeline, window_description.after_window, log, "expr after window"); } void PhysicalWindow::finalize(const Names & parent_require) diff --git a/dbms/src/Flash/Planner/tests/gtest_physical_plan.cpp b/dbms/src/Flash/Planner/tests/gtest_physical_plan.cpp index 20ec1150276..1d0704f692b 100644 --- a/dbms/src/Flash/Planner/tests/gtest_physical_plan.cpp +++ b/dbms/src/Flash/Planner/tests/gtest_physical_plan.cpp @@ -295,13 +295,13 @@ try execute( request, /*expected_physical_plan=*/R"( - | is_tidb_operator: false, schema: , , - | is_tidb_operator: true, schema: , , + | is_tidb_operator: false, schema: , , + | is_tidb_operator: true, schema: , , | is_tidb_operator: true, schema: , | is_tidb_operator: true, schema: , )", /*expected_streams=*/R"( Expression: - Expression: + Expression: Window, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current} MergeSorting, limit = 0 PartialSorting: limit = 0 @@ -314,13 +314,13 @@ Expression: execute( request, /*expected_physical_plan=*/R"( - | is_tidb_operator: false, schema: , , - | is_tidb_operator: true, schema: , , + | is_tidb_operator: false, schema: , , + | is_tidb_operator: true, schema: , , | is_tidb_operator: true, schema: , | is_tidb_operator: true, schema: , )", /*expected_streams=*/R"( Expression: - Expression: + Expression: Window: , function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current} MergeSorting: , limit = 0 PartialSorting: : limit = 0 diff --git a/dbms/src/Flash/tests/gtest_aggregation_executor.cpp b/dbms/src/Flash/tests/gtest_aggregation_executor.cpp index e3deeab3129..3f1766769f5 100644 --- a/dbms/src/Flash/tests/gtest_aggregation_executor.cpp +++ b/dbms/src/Flash/tests/gtest_aggregation_executor.cpp @@ -105,6 +105,11 @@ class ExecutorAggTestRunner : public DB::tests::ExecutorTest {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}, {toNullableVec("s1", {"banana", {}, "banana"}), toNullableVec("s2", {"apple", {}, "banana"})}); + + context.addMockTable({"test_db", "test_table"}, + {{"s1", TiDB::TP::TypeLongLong}, {"s2", TiDB::TP::TypeLongLong}}, + {toVec("s1", {1, 2, 3}), + toVec("s2", {1, 2, 3})}); } std::shared_ptr buildDAGRequest(std::pair src, MockAstVec agg_funcs, MockAstVec group_by_exprs, MockColumnNameVec proj) @@ -317,22 +322,38 @@ try .scan("aggnull_test", "t1") .aggregation({Max(col("s1"))}, {}) .build(context); - { - ASSERT_COLUMNS_EQ_R(executeStreams(request), - createColumns({toNullableVec({"banana"})})); - } + executeWithConcurrency(request, {{toNullableVec({"banana"})}}); request = context .scan("aggnull_test", "t1") .aggregation({}, {col("s1")}) .build(context); - { - ASSERT_COLUMNS_EQ_UR(executeStreams(request), - createColumns({toNullableVec("s1", {{}, "banana"})})); - } + executeWithConcurrency(request, {{toNullableVec("s1", {{}, "banana"})}}); } CATCH +TEST_F(ExecutorAggTestRunner, RepeatedAggregateFunction) +try +{ + /// select max(s1), max(s1) from test_db.test_table; + auto request = context + .scan("test_db", "test_table") + .aggregation({Max(col("s1")), Max(col("s1"))}, {}) + .build(context); + executeWithConcurrency( + request, + {{toNullableVec({3})}, {toNullableVec({3})}}); + + /// select max(s1), max(s1), sum(s2) from test_db.test_table; + request = context + .scan("test_db", "test_table") + .aggregation({Max(col("s1")), Max(col("s1")), Sum(col("s2"))}, {}) + .build(context); + executeWithConcurrency( + request, + {{toNullableVec({3})}, {toNullableVec({3})}, {toVec({6})}}); +} +CATCH // TODO support more type of min, max, count. // support more aggregation functions: sum, forst_row, group_concat diff --git a/dbms/src/Flash/tests/gtest_planner_interpreter.cpp b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp index 4085ef7a2e2..038af91f725 100644 --- a/dbms/src/Flash/tests/gtest_planner_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp @@ -614,7 +614,7 @@ try Union: Expression x 10: SharedQuery: - Expression: + Expression: Window, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current} MergeSorting, limit = 0 Union: @@ -634,7 +634,7 @@ Union: Expression x 10: Expression: SharedQuery: - Expression: + Expression: Window, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current} MergeSorting, limit = 0 Union: @@ -655,7 +655,7 @@ Union: Expression x 10: Expression: SharedQuery: - Expression: + Expression: Window, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current} Union: Expression x 10: @@ -684,7 +684,7 @@ try String expected = R"( Union: Expression x 10: - Expression: + Expression: Window: , function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current} MergeSorting: , limit = 0 PartialSorting: : limit = 0 @@ -719,7 +719,7 @@ Union: Union: Expression x 10: SharedQuery: - Expression: + Expression: Window, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current} MergeSorting, limit = 0 Union: diff --git a/dbms/src/Flash/tests/gtest_projection_executor.cpp b/dbms/src/Flash/tests/gtest_projection_executor.cpp index 3cb458d1045..354fd3747a7 100644 --- a/dbms/src/Flash/tests/gtest_projection_executor.cpp +++ b/dbms/src/Flash/tests/gtest_projection_executor.cpp @@ -49,10 +49,12 @@ class ExecutorProjectionTestRunner : public DB::tests::ExecutorTest return context.scan(db_name, table_name).project(param).build(context); }; + static const size_t max_concurrency_level = 10; + void executeWithConcurrency(const std::shared_ptr & request, const ColumnsWithTypeAndName & expect_columns) { WRAP_FOR_DIS_ENABLE_PLANNER_BEGIN - for (size_t i = 1; i < 10; i += 2) + for (size_t i = 1; i <= max_concurrency_level; i += 2) { ASSERT_COLUMNS_EQ_UR(executeStreams(request, i), expect_columns); } diff --git a/dbms/src/Flash/tests/gtest_window_executor.cpp b/dbms/src/Flash/tests/gtest_window_executor.cpp index d7f796ffee9..2a2260ada03 100644 --- a/dbms/src/Flash/tests/gtest_window_executor.cpp +++ b/dbms/src/Flash/tests/gtest_window_executor.cpp @@ -63,7 +63,8 @@ class WindowExecutorTestRunner : public DB::tests::ExecutorTest void executeWithConcurrency(const std::shared_ptr & request, const ColumnsWithTypeAndName & expect_columns) { WRAP_FOR_DIS_ENABLE_PLANNER_BEGIN - for (size_t i = 1; i <= max_concurrency_level; ++i) + ASSERT_COLUMNS_EQ_R(expect_columns, executeStreams(request)); + for (size_t i = 2; i <= max_concurrency_level; ++i) { ASSERT_COLUMNS_EQ_UR(expect_columns, executeStreams(request, i)); } @@ -73,7 +74,8 @@ class WindowExecutorTestRunner : public DB::tests::ExecutorTest void executeWithTableScanAndConcurrency(const std::shared_ptr & request, const ColumnsWithTypeAndName & source_columns, const ColumnsWithTypeAndName & expect_columns) { WRAP_FOR_DIS_ENABLE_PLANNER_BEGIN - for (size_t i = 1; i <= max_concurrency_level; ++i) + ASSERT_COLUMNS_EQ_R(expect_columns, executeStreamsWithSingleSource(request, source_columns, SourceType::TableScan)); + for (size_t i = 2; i <= max_concurrency_level; ++i) { ASSERT_COLUMNS_EQ_UR(expect_columns, executeStreamsWithSingleSource(request, source_columns, SourceType::TableScan)); } @@ -215,4 +217,170 @@ try } CATCH +TEST_F(WindowExecutorTestRunner, multiWindow) +try +{ + std::vector functions = {DenseRank(), Rank()}; + ColumnsWithTypeAndName functions_result = {toNullableVec("dense_rank", {1, 1, 2, 2, 1, 1, 2, 2}), toNullableVec("rank", {1, 1, 3, 3, 1, 1, 3, 3})}; + auto test_single_window_function = [&](size_t index) { + auto request = context + .scan("test_db", "test_table") + .sort({{"partition", false}, {"order", false}}, true) + .window(functions[index], {"order", false}, {"partition", false}, MockWindowFrame{}) + .build(context); + executeWithConcurrency(request, + createColumns({toNullableVec("partition", {1, 1, 1, 1, 2, 2, 2, 2}), + toNullableVec("order", {1, 1, 2, 2, 1, 1, 2, 2}), + functions_result[index]})); + }; + for (size_t i = 0; i < functions.size(); ++i) + test_single_window_function(i); + + auto gen_merge_window_request = [&](const std::vector & wfs) { + return context + .scan("test_db", "test_table") + .sort({{"partition", false}, {"order", false}}, true) + .window(wfs, {{"order", false}}, {{"partition", false}}, MockWindowFrame()) + .build(context); + }; + + auto gen_split_window_request = [&](const std::vector & wfs) { + auto req = context + .scan("test_db", "test_table") + .sort({{"partition", false}, {"order", false}}, true); + for (const auto & wf : wfs) + req.window(wf, {"order", false}, {"partition", false}, MockWindowFrame()); + return req.build(context); + }; + + std::vector wfs; + ColumnsWithTypeAndName wfs_result = {{toNullableVec("partition", {1, 1, 1, 1, 2, 2, 2, 2})}, {toNullableVec("order", {1, 1, 2, 2, 1, 1, 2, 2})}}; + for (size_t i = 0; i < functions.size(); ++i) + { + wfs.push_back(functions[i]); + wfs_result.push_back(functions_result[i]); + for (size_t j = 0; j < functions.size(); ++j) + { + wfs.push_back(functions[j]); + wfs_result.push_back(functions_result[j]); + for (size_t k = 0; k < functions.size(); ++k) + { + wfs.push_back(functions[k]); + wfs_result.push_back(functions_result[k]); + + executeWithConcurrency(gen_merge_window_request(wfs), wfs_result); + executeWithConcurrency(gen_split_window_request(wfs), wfs_result); + + wfs.pop_back(); + wfs_result.pop_back(); + } + wfs.pop_back(); + wfs_result.pop_back(); + } + wfs.pop_back(); + wfs_result.pop_back(); + } +} +CATCH + +TEST_F(WindowExecutorTestRunner, multiWindowThenAgg) +try +{ + /* + select count(1) from ( + SELECT + ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order`), + ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order` DESC) + FROM `test_db`.`test_table` + )t1; + */ + auto request = context + .scan("test_db", "test_table") + .sort({{"partition", false}, {"order", false}}, true) + .window(RowNumber(), {"order", false}, {"partition", false}, buildDefaultRowsFrame()) + .sort({{"partition", false}, {"order", true}}, true) + .window(RowNumber(), {"order", true}, {"partition", false}, buildDefaultRowsFrame()) + .aggregation({Count(lit(Field(static_cast(1))))}, {}) + .build(context); + executeWithConcurrency(request, createColumns({toVec({8})})); + + /* + select count(1) from ( + SELECT + ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order`), + ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order`) + FROM `test_db`.`test_table` + )t1; + */ + request = context + .scan("test_db", "test_table") + .sort({{"partition", false}, {"order", false}}, true) + .window(RowNumber(), {"order", false}, {"partition", false}, buildDefaultRowsFrame()) + .window(RowNumber(), {"order", false}, {"partition", false}, buildDefaultRowsFrame()) + .aggregation({Count(lit(Field(static_cast(1))))}, {}) + .build(context); + executeWithConcurrency(request, createColumns({toVec({8})})); + + request = context + .scan("test_db", "test_table") + .sort({{"partition", false}, {"order", false}}, true) + .window({RowNumber(), RowNumber()}, {{"order", false}}, {{"partition", false}}, buildDefaultRowsFrame()) + .aggregation({Count(lit(Field(static_cast(1))))}, {}) + .build(context); + executeWithConcurrency(request, createColumns({toVec({8})})); + + /* + select count(1) from ( + SELECT + Rank() OVER (PARTITION BY `partition` ORDER BY `order`), + DenseRank() OVER (PARTITION BY `partition` ORDER BY `order`) + FROM `test_db`.`test_table` + )t1; + */ + request = context + .scan("test_db", "test_table") + .sort({{"partition", false}, {"order", false}}, true) + .window(Rank(), {"order", false}, {"partition", false}, MockWindowFrame()) + .window(DenseRank(), {"order", false}, {"partition", false}, MockWindowFrame()) + .aggregation({Count(lit(Field(static_cast(1))))}, {}) + .build(context); + executeWithConcurrency(request, createColumns({toVec({8})})); + + request = context + .scan("test_db", "test_table") + .sort({{"partition", false}, {"order", false}}, true) + .window({Rank(), DenseRank()}, {{"order", false}}, {{"partition", false}}, MockWindowFrame()) + .aggregation({Count(lit(Field(static_cast(1))))}, {}) + .build(context); + executeWithConcurrency(request, createColumns({toVec({8})})); + + /* + select count(1) from ( + SELECT + DenseRank() OVER (PARTITION BY `partition` ORDER BY `order`), + DenseRank() OVER (PARTITION BY `partition` ORDER BY `order`), + Rank() OVER (PARTITION BY `partition` ORDER BY `order`) + FROM `test_db`.`test_table` + )t1; + */ + request = context + .scan("test_db", "test_table") + .sort({{"partition", false}, {"order", false}}, true) + .window({DenseRank(), DenseRank(), Rank()}, {{"order", false}}, {{"partition", false}}, MockWindowFrame()) + .aggregation({Count(lit(Field(static_cast(1))))}, {}) + .build(context); + executeWithConcurrency(request, createColumns({toVec({8})})); + + request = context + .scan("test_db", "test_table") + .sort({{"partition", false}, {"order", false}}, true) + .window(DenseRank(), {"order", false}, {"partition", false}, MockWindowFrame()) + .window(DenseRank(), {"order", false}, {"partition", false}, MockWindowFrame()) + .window(Rank(), {"order", false}, {"partition", false}, MockWindowFrame()) + .aggregation({Count(lit(Field(static_cast(1))))}, {}) + .build(context); + executeWithConcurrency(request, createColumns({toVec({8})})); +} +CATCH + } // namespace DB::tests diff --git a/tests/fullstack-test/mpp/window.test b/tests/fullstack-test/mpp/window.test index 698d39ef2ea..f3e04a15c36 100644 --- a/tests/fullstack-test/mpp/window.test +++ b/tests/fullstack-test/mpp/window.test @@ -16,7 +16,12 @@ mysql> drop table if exists test.t1; mysql> create table test.t1(c1 int, c2 int); mysql> insert into test.t1 values(1, 1),(2, 2),(3, 3),(1, 1),(2, 2),(3, 3),(4, 4); mysql> alter table test.t1 set tiflash replica 1; +mysql> drop table if exists test.t2; +mysql> CREATE TABLE `test`.`t2` (`c1` bigint(20) DEFAULT NULL, `c2` varchar(20) DEFAULT NULL); +mysql> insert into test.t2 values(1, 'a'), (1, 'a'), (2, 'a'), (2, 'a'), (3, 'b'), (3, 'b'), (4, 'b'), (4, 'b'); +mysql> alter table test.t2 set tiflash replica 1; func> wait_table test t1 +func> wait_table test t2 mysql> use test; set @@tidb_isolation_read_engines='tiflash'; select c1, c2, row_number() over w2, row_number() over w1 from test.t1 window w1 as(partition by c1), w2 as (partition by c1, c2) order by 1, 2, 3, 4; +------+------+----------------------+----------------------+ | c1 | c2 | row_number() over w2 | row_number() over w1 | @@ -29,4 +34,24 @@ mysql> use test; set @@tidb_isolation_read_engines='tiflash'; select c1, c2, row | 3 | 3 | 2 | 2 | | 4 | 4 | 1 | 1 | +------+------+----------------------+----------------------+ +mysql> use test; set @@tidb_isolation_read_engines='tiflash'; SELECT Rank() OVER (PARTITION BY c2 ORDER BY c1), Rank() OVER (PARTITION BY c2 ORDER BY c1), Dense_Rank() OVER (PARTITION BY c2 ORDER BY c1), Dense_Rank() OVER (PARTITION BY c2 ORDER BY c1), ROW_NUMBER() OVER (PARTITION BY c2 ORDER BY c1), ROW_NUMBER() OVER (PARTITION BY c2 ORDER BY c1) FROM test.t2; ++-------------------------------------------+-------------------------------------------+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+ +| Rank() OVER (PARTITION BY c2 ORDER BY c1) | Rank() OVER (PARTITION BY c2 ORDER BY c1) | Dense_Rank() OVER (PARTITION BY c2 ORDER BY c1) | Dense_Rank() OVER (PARTITION BY c2 ORDER BY c1) | ROW_NUMBER() OVER (PARTITION BY c2 ORDER BY c1) | ROW_NUMBER() OVER (PARTITION BY c2 ORDER BY c1) | ++-------------------------------------------+-------------------------------------------+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+ +| 1 | 1 | 1 | 1 | 1 | 1 | +| 1 | 1 | 1 | 1 | 2 | 2 | +| 3 | 3 | 2 | 2 | 3 | 3 | +| 3 | 3 | 2 | 2 | 4 | 4 | +| 1 | 1 | 1 | 1 | 1 | 1 | +| 1 | 1 | 1 | 1 | 2 | 2 | +| 3 | 3 | 2 | 2 | 3 | 3 | +| 3 | 3 | 2 | 2 | 4 | 4 | ++-------------------------------------------+-------------------------------------------+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+ +mysql> use test; set @@tidb_isolation_read_engines='tiflash'; select count(*) from (SELECT Rank() OVER (PARTITION BY c2 ORDER BY c1), Dense_Rank() OVER (PARTITION BY c2 ORDER BY c1), ROW_NUMBER() OVER (PARTITION BY c2 ORDER BY c1) FROM test.t2) t1; ++----------+ +| count(*) | ++----------+ +| 8 | ++----------+ mysql> drop table if exists test.t1; +mysql> drop table if exists test.t2;