From 2ea6253a875192ce6e362d212ddaa1e9e8615ac8 Mon Sep 17 00:00:00 2001 From: yanweiqi <592838129@qq.com> Date: Thu, 22 Dec 2022 11:08:55 +0800 Subject: [PATCH] Test: fix mpp test bug (#6511) ref pingcap/tiflash#4609, close pingcap/tiflash#6442 --- dbms/src/Debug/MockComputeServerManager.cpp | 2 + dbms/src/Debug/MockComputeServerManager.h | 4 +- .../Debug/MockExecutor/AggregationBinder.cpp | 5 +- .../Debug/MockExecutor/AggregationBinder.h | 6 ++- dbms/src/Debug/MockExecutor/AstToPB.cpp | 18 +++++++ dbms/src/Debug/MockExecutor/AstToPB.h | 25 +++------- .../MockExecutor/ExchangeReceiverBinder.cpp | 5 ++ .../MockExecutor/ExchangeReceiverBinder.h | 1 + .../MockExecutor/ExchangeSenderBinder.cpp | 5 ++ .../Debug/MockExecutor/ExchangeSenderBinder.h | 1 + dbms/src/Debug/MockExecutor/ExecutorBinder.h | 10 ++-- dbms/src/Debug/MockExecutor/JoinBinder.cpp | 4 ++ dbms/src/Debug/MockExecutor/JoinBinder.h | 4 +- dbms/src/Debug/MockExecutor/LimitBinder.cpp | 1 + dbms/src/Debug/MockExecutor/ProjectBinder.cpp | 4 ++ .../Debug/MockExecutor/SelectionBinder.cpp | 2 +- dbms/src/Debug/MockExecutor/SelectionBinder.h | 1 - dbms/src/Debug/MockExecutor/SortBinder.cpp | 1 + .../Debug/MockExecutor/TableScanBinder.cpp | 1 + dbms/src/Debug/MockExecutor/TopNBinder.cpp | 2 + dbms/src/Debug/MockExecutor/WindowBinder.cpp | 3 ++ dbms/src/Debug/MockExecutor/WindowBinder.h | 1 - dbms/src/Debug/MockStorage.cpp | 33 +++++++++---- dbms/src/Debug/MockStorage.h | 5 +- dbms/src/Debug/dbgFuncCoprocessor.cpp | 3 ++ dbms/src/Debug/dbgFuncCoprocessor.h | 1 - dbms/src/Debug/dbgFuncCoprocessorUtils.cpp | 13 +++++ dbms/src/Debug/dbgFuncCoprocessorUtils.h | 17 +++---- dbms/src/Debug/dbgNaturalDag.h | 1 + dbms/src/Debug/dbgQueryCompiler.cpp | 27 +++++++++++ dbms/src/Debug/dbgQueryCompiler.h | 23 +-------- dbms/src/Debug/dbgQueryExecutor.cpp | 8 +++- dbms/src/Debug/dbgQueryExecutor.h | 1 - .../Coprocessor/DAGQueryBlockInterpreter.cpp | 12 ++++- .../Flash/Coprocessor/MockSourceStream.cpp | 24 ++++++++++ dbms/src/Flash/Coprocessor/MockSourceStream.h | 28 +++++++---- .../Planner/plans/PhysicalMockTableScan.cpp | 11 ++++- .../Flash/Planner/plans/PhysicalTableScan.cpp | 1 - dbms/src/Flash/tests/gtest_compute_server.cpp | 47 +++++++++++++++++++ dbms/src/Server/FlashGrpcServerHolder.cpp | 1 + dbms/src/TestUtils/ColumnsToTiPBExpr.cpp | 3 +- dbms/src/TestUtils/MPPTaskTestUtils.cpp | 22 +++++++-- dbms/src/TestUtils/MPPTaskTestUtils.h | 14 +++--- dbms/src/TestUtils/mockExecutor.cpp | 29 ++++++++---- dbms/src/TestUtils/mockExecutor.h | 1 + 45 files changed, 319 insertions(+), 112 deletions(-) create mode 100644 dbms/src/Flash/Coprocessor/MockSourceStream.cpp diff --git a/dbms/src/Debug/MockComputeServerManager.cpp b/dbms/src/Debug/MockComputeServerManager.cpp index 839cd794b73..64b5c6fc7d4 100644 --- a/dbms/src/Debug/MockComputeServerManager.cpp +++ b/dbms/src/Debug/MockComputeServerManager.cpp @@ -13,6 +13,8 @@ // limitations under the License. #include #include +#include +#include #include #include diff --git a/dbms/src/Debug/MockComputeServerManager.h b/dbms/src/Debug/MockComputeServerManager.h index 6642388659f..dd622e00b70 100644 --- a/dbms/src/Debug/MockComputeServerManager.h +++ b/dbms/src/Debug/MockComputeServerManager.h @@ -15,10 +15,8 @@ #pragma once #include -#include +#include #include -#include -#include namespace DB::tests { diff --git a/dbms/src/Debug/MockExecutor/AggregationBinder.cpp b/dbms/src/Debug/MockExecutor/AggregationBinder.cpp index a39f196a389..7b5d9b9d134 100644 --- a/dbms/src/Debug/MockExecutor/AggregationBinder.cpp +++ b/dbms/src/Debug/MockExecutor/AggregationBinder.cpp @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include +#include #include #include -#include +#include +#include #include namespace DB::mock diff --git a/dbms/src/Debug/MockExecutor/AggregationBinder.h b/dbms/src/Debug/MockExecutor/AggregationBinder.h index 4ece3ff7838..84821594988 100644 --- a/dbms/src/Debug/MockExecutor/AggregationBinder.h +++ b/dbms/src/Debug/MockExecutor/AggregationBinder.h @@ -14,12 +14,14 @@ #pragma once -#include -#include #include +#include namespace DB::mock { +class ExchangeSenderBinder; +class ExchangeReceiverBinder; + class AggregationBinder : public ExecutorBinder { public: diff --git a/dbms/src/Debug/MockExecutor/AstToPB.cpp b/dbms/src/Debug/MockExecutor/AstToPB.cpp index 306d2c24813..fa58e2e3fc8 100644 --- a/dbms/src/Debug/MockExecutor/AstToPB.cpp +++ b/dbms/src/Debug/MockExecutor/AstToPB.cpp @@ -12,7 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace DB { diff --git a/dbms/src/Debug/MockExecutor/AstToPB.h b/dbms/src/Debug/MockExecutor/AstToPB.h index c1560c90355..2f25618c361 100644 --- a/dbms/src/Debug/MockExecutor/AstToPB.h +++ b/dbms/src/Debug/MockExecutor/AstToPB.h @@ -14,24 +14,8 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include namespace DB { @@ -41,6 +25,11 @@ extern const int BAD_ARGUMENTS; extern const int LOGICAL_ERROR; extern const int NO_SUCH_COLUMN_IN_TABLE; } // namespace ErrorCodes + +class ASTFunction; +class ASTIdentifier; +class Context; + struct MPPCtx { Timestamp start_ts; diff --git a/dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.cpp b/dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.cpp index 706624856c0..21d4d649ffe 100644 --- a/dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.cpp +++ b/dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.cpp @@ -12,8 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include #include #include +#include +#include namespace DB::mock { diff --git a/dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.h b/dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.h index 2885dfd895d..c2327c87861 100644 --- a/dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.h +++ b/dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.h @@ -14,6 +14,7 @@ #pragma once +#include #include namespace DB::mock diff --git a/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp b/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp index aaba39868e1..45abb7de9fa 100644 --- a/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp +++ b/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp @@ -12,8 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include #include #include +#include +#include namespace DB::mock { diff --git a/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.h b/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.h index 0b8b33821cf..ed6710ac22e 100644 --- a/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.h +++ b/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.h @@ -14,6 +14,7 @@ #pragma once +#include #include namespace DB::mock diff --git a/dbms/src/Debug/MockExecutor/ExecutorBinder.h b/dbms/src/Debug/MockExecutor/ExecutorBinder.h index de8e3c9928c..d1a03ff96d3 100644 --- a/dbms/src/Debug/MockExecutor/ExecutorBinder.h +++ b/dbms/src/Debug/MockExecutor/ExecutorBinder.h @@ -14,10 +14,13 @@ #pragma once +#include #include -#include #include -#include +#include +#include +#include +#include namespace DB::mock @@ -25,7 +28,6 @@ namespace DB::mock class ExchangeSenderBinder; class ExchangeReceiverBinder; - // Convert CH AST to tipb::Executor // Used in integration test framework and Unit test framework. class ExecutorBinder @@ -45,7 +47,7 @@ class ExecutorBinder index_++; } - std::vector> getChildren() + std::vector> getChildren() const { return children; } diff --git a/dbms/src/Debug/MockExecutor/JoinBinder.cpp b/dbms/src/Debug/MockExecutor/JoinBinder.cpp index 92109b73f1b..df0f11c2133 100644 --- a/dbms/src/Debug/MockExecutor/JoinBinder.cpp +++ b/dbms/src/Debug/MockExecutor/JoinBinder.cpp @@ -12,10 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include #include #include #include #include +#include +#include #include namespace DB::mock diff --git a/dbms/src/Debug/MockExecutor/JoinBinder.h b/dbms/src/Debug/MockExecutor/JoinBinder.h index 5ab1fb83f4b..c649420b8a9 100644 --- a/dbms/src/Debug/MockExecutor/JoinBinder.h +++ b/dbms/src/Debug/MockExecutor/JoinBinder.h @@ -14,12 +14,12 @@ #pragma once -#include -#include #include namespace DB::mock { +class ExchangeSenderBinder; +class ExchangeReceiverBinder; class JoinBinder : public ExecutorBinder { public: diff --git a/dbms/src/Debug/MockExecutor/LimitBinder.cpp b/dbms/src/Debug/MockExecutor/LimitBinder.cpp index c0a9bf17a82..de90b96f252 100644 --- a/dbms/src/Debug/MockExecutor/LimitBinder.cpp +++ b/dbms/src/Debug/MockExecutor/LimitBinder.cpp @@ -14,6 +14,7 @@ #include #include +#include namespace DB::mock { diff --git a/dbms/src/Debug/MockExecutor/ProjectBinder.cpp b/dbms/src/Debug/MockExecutor/ProjectBinder.cpp index ebe8e5d8bde..50f0646c864 100644 --- a/dbms/src/Debug/MockExecutor/ProjectBinder.cpp +++ b/dbms/src/Debug/MockExecutor/ProjectBinder.cpp @@ -12,9 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include #include #include +#include #include +#include namespace DB::mock { diff --git a/dbms/src/Debug/MockExecutor/SelectionBinder.cpp b/dbms/src/Debug/MockExecutor/SelectionBinder.cpp index cea52b56922..c3171fa5e2c 100644 --- a/dbms/src/Debug/MockExecutor/SelectionBinder.cpp +++ b/dbms/src/Debug/MockExecutor/SelectionBinder.cpp @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -#include +#include #include namespace DB::mock diff --git a/dbms/src/Debug/MockExecutor/SelectionBinder.h b/dbms/src/Debug/MockExecutor/SelectionBinder.h index d4270ed5fac..b5e1c2000f3 100644 --- a/dbms/src/Debug/MockExecutor/SelectionBinder.h +++ b/dbms/src/Debug/MockExecutor/SelectionBinder.h @@ -14,7 +14,6 @@ #pragma once -#include #include namespace DB::mock diff --git a/dbms/src/Debug/MockExecutor/SortBinder.cpp b/dbms/src/Debug/MockExecutor/SortBinder.cpp index 80265448824..1af2820c71e 100644 --- a/dbms/src/Debug/MockExecutor/SortBinder.cpp +++ b/dbms/src/Debug/MockExecutor/SortBinder.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include diff --git a/dbms/src/Debug/MockExecutor/TableScanBinder.cpp b/dbms/src/Debug/MockExecutor/TableScanBinder.cpp index e35a14e4269..27f399f6d40 100644 --- a/dbms/src/Debug/MockExecutor/TableScanBinder.cpp +++ b/dbms/src/Debug/MockExecutor/TableScanBinder.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include diff --git a/dbms/src/Debug/MockExecutor/TopNBinder.cpp b/dbms/src/Debug/MockExecutor/TopNBinder.cpp index f8d7dd5f006..aee74b9300b 100644 --- a/dbms/src/Debug/MockExecutor/TopNBinder.cpp +++ b/dbms/src/Debug/MockExecutor/TopNBinder.cpp @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include +#include #include namespace DB::mock diff --git a/dbms/src/Debug/MockExecutor/WindowBinder.cpp b/dbms/src/Debug/MockExecutor/WindowBinder.cpp index 8da8ae5d8ef..0642300cecb 100644 --- a/dbms/src/Debug/MockExecutor/WindowBinder.cpp +++ b/dbms/src/Debug/MockExecutor/WindowBinder.cpp @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include +#include #include +#include namespace DB::mock { diff --git a/dbms/src/Debug/MockExecutor/WindowBinder.h b/dbms/src/Debug/MockExecutor/WindowBinder.h index 443506baa33..b9745d3358b 100644 --- a/dbms/src/Debug/MockExecutor/WindowBinder.h +++ b/dbms/src/Debug/MockExecutor/WindowBinder.h @@ -27,7 +27,6 @@ struct MockWindowFrame std::optional end; // TODO: support calcFuncs }; - using ASTPartitionByElement = ASTOrderByElement; class WindowBinder : public ExecutorBinder diff --git a/dbms/src/Debug/MockStorage.cpp b/dbms/src/Debug/MockStorage.cpp index dbcf38c831b..7a19da7085b 100644 --- a/dbms/src/Debug/MockStorage.cpp +++ b/dbms/src/Debug/MockStorage.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. #include +#include namespace DB::tests { @@ -22,8 +23,11 @@ void MockStorage::addTableSchema(const String & name, const MockColumnInfoVec & addTableInfo(name, columnInfos); } -void MockStorage::addTableData(const String & name, const ColumnsWithTypeAndName & columns) +void MockStorage::addTableData(const String & name, ColumnsWithTypeAndName & columns) { + for (size_t i = 0; i < columns.size(); ++i) + columns[i].column_id = i; + table_columns[getTableId(name)] = columns; } @@ -123,11 +127,12 @@ CutColumnInfo getCutColumnInfo(size_t rows, Int64 partition_id, Int64 partition_ return {start, cur_rows}; } -ColumnsWithTypeAndName MockStorage::getColumnsForMPPTableScan(Int64 table_id, Int64 partition_id, Int64 partition_num) +ColumnsWithTypeAndName MockStorage::getColumnsForMPPTableScan(const TiDBTableScan & table_scan, Int64 partition_id, Int64 partition_num) { + auto table_id = table_scan.getLogicalTableID(); if (tableExists(table_id)) { - auto columns_with_type_and_name = table_columns[table_id]; + auto columns_with_type_and_name = table_columns[table_scan.getLogicalTableID()]; size_t rows = 0; for (const auto & col : columns_with_type_and_name) { @@ -141,11 +146,23 @@ ColumnsWithTypeAndName MockStorage::getColumnsForMPPTableScan(Int64 table_id, In ColumnsWithTypeAndName res; for (const auto & column_with_type_and_name : columns_with_type_and_name) { - res.push_back( - ColumnWithTypeAndName( - column_with_type_and_name.column->cut(cut_info.first, cut_info.second), - column_with_type_and_name.type, - column_with_type_and_name.name)); + bool contains = false; + for (const auto & column : table_scan.getColumns()) + { + if (column.id == column_with_type_and_name.column_id) + { + contains = true; + break; + } + } + if (contains) + { + res.push_back( + ColumnWithTypeAndName( + column_with_type_and_name.column->cut(cut_info.first, cut_info.second), + column_with_type_and_name.type, + column_with_type_and_name.name)); + } } return res; } diff --git a/dbms/src/Debug/MockStorage.h b/dbms/src/Debug/MockStorage.h index 46e8331602f..ff5ff0627b3 100644 --- a/dbms/src/Debug/MockStorage.h +++ b/dbms/src/Debug/MockStorage.h @@ -13,6 +13,7 @@ // limitations under the License. #pragma once #include +#include #include #include @@ -47,7 +48,7 @@ class MockStorage /// for table scan void addTableSchema(const String & name, const MockColumnInfoVec & columnInfos); - void addTableData(const String & name, const ColumnsWithTypeAndName & columns); + void addTableData(const String & name, ColumnsWithTypeAndName & columns); Int64 getTableId(const String & name); @@ -72,7 +73,7 @@ class MockStorage MockColumnInfoVec getExchangeSchema(const String & exchange_name); /// for MPP Tasks, it will split data by partition num, then each MPP service will have a subset of mock data. - ColumnsWithTypeAndName getColumnsForMPPTableScan(Int64 table_id, Int64 partition_id, Int64 partition_num); + ColumnsWithTypeAndName getColumnsForMPPTableScan(const TiDBTableScan & table_scan, Int64 partition_id, Int64 partition_num); TableInfo getTableInfo(const String & name); diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index 07ee8703b92..112f43b568b 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -15,7 +15,10 @@ #include #include #include +#include +#include #include +#include namespace DB { diff --git a/dbms/src/Debug/dbgFuncCoprocessor.h b/dbms/src/Debug/dbgFuncCoprocessor.h index a296e93d410..9a21842fa50 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.h +++ b/dbms/src/Debug/dbgFuncCoprocessor.h @@ -15,7 +15,6 @@ #pragma once #include -#include namespace DB { class Context; diff --git a/dbms/src/Debug/dbgFuncCoprocessorUtils.cpp b/dbms/src/Debug/dbgFuncCoprocessorUtils.cpp index b1dd70feba7..e89163c2c1d 100644 --- a/dbms/src/Debug/dbgFuncCoprocessorUtils.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessorUtils.cpp @@ -12,8 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include +#include #include +#include +#include +#include #include +#include +#include +#include +#include +#include +#include namespace DB { diff --git a/dbms/src/Debug/dbgFuncCoprocessorUtils.h b/dbms/src/Debug/dbgFuncCoprocessorUtils.h index 7d9ca5a1075..0f2c3d85533 100644 --- a/dbms/src/Debug/dbgFuncCoprocessorUtils.h +++ b/dbms/src/Debug/dbgFuncCoprocessorUtils.h @@ -13,19 +13,10 @@ // limitations under the License. #pragma once -#include -#include -#include -#include -#include -#include +#include #include -#include -#include #include -#include -#include -#include +#include namespace DB { @@ -33,6 +24,10 @@ namespace ErrorCodes { extern const int BAD_ARGUMENTS; } +class Context; +struct DAGProperties; +class IBlockInputStream; +using BlockInputStreamPtr = std::shared_ptr; std::unique_ptr getCodec(tipb::EncodeType encode_type); DAGSchema getSelectSchema(Context & context); diff --git a/dbms/src/Debug/dbgNaturalDag.h b/dbms/src/Debug/dbgNaturalDag.h index f7c1d850ebe..67c7dca288e 100644 --- a/dbms/src/Debug/dbgNaturalDag.h +++ b/dbms/src/Debug/dbgNaturalDag.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include diff --git a/dbms/src/Debug/dbgQueryCompiler.cpp b/dbms/src/Debug/dbgQueryCompiler.cpp index 2562e6b2efc..f9e58b1a424 100644 --- a/dbms/src/Debug/dbgQueryCompiler.cpp +++ b/dbms/src/Debug/dbgQueryCompiler.cpp @@ -12,7 +12,34 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace DB { diff --git a/dbms/src/Debug/dbgQueryCompiler.h b/dbms/src/Debug/dbgQueryCompiler.h index 748b14d41e8..87397ab0728 100644 --- a/dbms/src/Debug/dbgQueryCompiler.h +++ b/dbms/src/Debug/dbgQueryCompiler.h @@ -16,32 +16,10 @@ #include #include -#include #include -#include -#include #include -#include -#include -#include -#include -#include -#include -#include -#include #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include namespace DB @@ -49,6 +27,7 @@ namespace DB using MakeResOutputStream = std::function; using ExecutorBinderPtr = mock::ExecutorBinderPtr; using TableInfo = TiDB::TableInfo; +struct ASTTablesInSelectQueryElement; enum class QueryTaskType { diff --git a/dbms/src/Debug/dbgQueryExecutor.cpp b/dbms/src/Debug/dbgQueryExecutor.cpp index be7ee9b9ca6..359aa833f25 100644 --- a/dbms/src/Debug/dbgQueryExecutor.cpp +++ b/dbms/src/Debug/dbgQueryExecutor.cpp @@ -12,12 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include - +#include +#include +#include +#include namespace DB { +using TiFlashTestEnv = tests::TiFlashTestEnv; + void setTipbRegionInfo(coprocessor::RegionInfo * tipb_region_info, const std::pair & region, TableID table_id) { tipb_region_info->set_region_id(region.first); diff --git a/dbms/src/Debug/dbgQueryExecutor.h b/dbms/src/Debug/dbgQueryExecutor.h index 0b3c639a20c..aa308ada9fc 100644 --- a/dbms/src/Debug/dbgQueryExecutor.h +++ b/dbms/src/Debug/dbgQueryExecutor.h @@ -18,7 +18,6 @@ namespace DB { using MockServerConfig = tests::MockServerConfig; -using TiFlashTestEnv = tests::TiFlashTestEnv; BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DAGProperties & properties, QueryTasks & query_tasks, MakeResOutputStream & func_wrap_output_stream); BlockInputStreamPtr executeMPPQuery(Context & context, const DAGProperties & properties, QueryTasks & query_tasks); diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 2902d66b57a..34fd53c3455 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -176,7 +176,17 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s } else { - auto [names_and_types, mock_table_scan_streams] = mockSourceStream(context, max_streams, log, table_scan.getTableScanExecutorID(), table_scan.getLogicalTableID()); + NamesAndTypes names_and_types; + std::vector> mock_table_scan_streams; + if (context.isMPPTest()) + { + std::tie(names_and_types, mock_table_scan_streams) = mockSourceStreamForMpp(context, max_streams, log, table_scan); + } + else + { + std::tie(names_and_types, mock_table_scan_streams) = mockSourceStream(context, max_streams, log, table_scan.getTableScanExecutorID(), table_scan.getLogicalTableID()); + } + analyzer = std::make_unique(std::move(names_and_types), context); pipeline.streams.insert(pipeline.streams.end(), mock_table_scan_streams.begin(), mock_table_scan_streams.end()); } diff --git a/dbms/src/Flash/Coprocessor/MockSourceStream.cpp b/dbms/src/Flash/Coprocessor/MockSourceStream.cpp new file mode 100644 index 00000000000..c8e662adc32 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/MockSourceStream.cpp @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB +{ +std::pair>> mockSourceStreamForMpp(Context & context, size_t max_streams, DB::LoggerPtr log, const TiDBTableScan & table_scan) +{ + ColumnsWithTypeAndName columns_with_type_and_name = context.mockStorage().getColumnsForMPPTableScan(table_scan, context.mockMPPServerInfo().partition_id, context.mockMPPServerInfo().partition_num); + return cutStreams(context, columns_with_type_and_name, max_streams, log); +} +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/MockSourceStream.h b/dbms/src/Flash/Coprocessor/MockSourceStream.h index 7cb0ffc95e7..c84d37d2a06 100644 --- a/dbms/src/Flash/Coprocessor/MockSourceStream.h +++ b/dbms/src/Flash/Coprocessor/MockSourceStream.h @@ -18,25 +18,19 @@ #include #include #include +#include #include +#include + namespace DB { - template -std::pair>> mockSourceStream(Context & context, size_t max_streams, DB::LoggerPtr log, String executor_id, Int64 table_id = 0) +std::pair>> cutStreams(Context & context, ColumnsWithTypeAndName & columns_with_type_and_name, size_t max_streams, DB::LoggerPtr log) { - ColumnsWithTypeAndName columns_with_type_and_name; NamesAndTypes names_and_types; size_t rows = 0; std::vector> mock_source_streams; - if constexpr (std::is_same_v) - columns_with_type_and_name = context.mockStorage().getExchangeColumns(executor_id); - else if (context.isMPPTest()) - columns_with_type_and_name = context.mockStorage().getColumnsForMPPTableScan(table_id, context.mockMPPServerInfo().partition_id, context.mockMPPServerInfo().partition_num); - else - columns_with_type_and_name = context.mockStorage().getColumns(table_id); - for (const auto & col : columns_with_type_and_name) { if (rows == 0) @@ -68,4 +62,18 @@ std::pair>> mockSourceStr RUNTIME_ASSERT(start == rows, log, "mock source streams' total size must same as user input"); return {names_and_types, mock_source_streams}; } + +std::pair>> mockSourceStreamForMpp(Context & context, size_t max_streams, DB::LoggerPtr log, const TiDBTableScan & table_scan); + +template +std::pair>> mockSourceStream(Context & context, size_t max_streams, DB::LoggerPtr log, String executor_id, Int64 table_id = 0) +{ + ColumnsWithTypeAndName columns_with_type_and_name; + if constexpr (std::is_same_v) + columns_with_type_and_name = context.mockStorage().getExchangeColumns(executor_id); + else + columns_with_type_and_name = context.mockStorage().getColumns(table_id); + + return cutStreams(context, columns_with_type_and_name, max_streams, log); +} } // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp b/dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp index c3019f5ed8b..23833b9af0b 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp @@ -50,7 +50,16 @@ std::pair mockSchemaAndStreams( else { /// build from user input blocks. - auto [names_and_types, mock_table_scan_streams] = mockSourceStream(context, max_streams, log, executor_id, table_scan.getLogicalTableID()); + NamesAndTypes names_and_types; + std::vector> mock_table_scan_streams; + if (context.isMPPTest()) + { + std::tie(names_and_types, mock_table_scan_streams) = mockSourceStreamForMpp(context, max_streams, log, table_scan); + } + else + { + std::tie(names_and_types, mock_table_scan_streams) = mockSourceStream(context, max_streams, log, executor_id, table_scan.getLogicalTableID()); + } schema = std::move(names_and_types); mock_streams.insert(mock_streams.end(), mock_table_scan_streams.begin(), mock_table_scan_streams.end()); } diff --git a/dbms/src/Flash/Planner/plans/PhysicalTableScan.cpp b/dbms/src/Flash/Planner/plans/PhysicalTableScan.cpp index 06710f4dc98..ab8c60a4ba3 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalTableScan.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalTableScan.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include #include #include diff --git a/dbms/src/Flash/tests/gtest_compute_server.cpp b/dbms/src/Flash/tests/gtest_compute_server.cpp index 264db3ea876..e90e88f2289 100644 --- a/dbms/src/Flash/tests/gtest_compute_server.cpp +++ b/dbms/src/Flash/tests/gtest_compute_server.cpp @@ -290,6 +290,53 @@ try } CATCH +TEST_F(ComputeServerRunner, aggWithColumnPrune) +try +{ + startServers(3); + + context.addMockTable( + {"test_db", "test_table_2"}, + {{"i1", TiDB::TP::TypeLong}, {"i2", TiDB::TP::TypeLong}, {"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}, {"s3", TiDB::TP::TypeString}, {"s4", TiDB::TP::TypeString}, {"s5", TiDB::TP::TypeString}}, + {toNullableVec("i1", {0, 0, 0}), toNullableVec("i2", {1, 1, 1}), toNullableVec("s1", {"1", "9", "8"}), toNullableVec("s2", {"1", "9", "8"}), toNullableVec("s3", {"4", "9", "99"}), toNullableVec("s4", {"4", "9", "999"}), toNullableVec("s5", {"4", "9", "9999"})}); + std::vector res{"9", "9", "99", "999", "9999"}; + std::vector max_cols{"s1", "s2", "s3", "s4", "s5"}; + for (size_t i = 0; i < 1; ++i) + { + { + auto request = context + .scan("test_db", "test_table_2") + .aggregation({Max(col(max_cols[i]))}, {col("i1")}); + auto expected_cols = { + toNullableVec({res[i]}), + toNullableVec({{0}})}; + ASSERT_COLUMNS_EQ_UR(expected_cols, buildAndExecuteMPPTasks(request)); + } + + { + auto request = context + .scan("test_db", "test_table_2") + .aggregation({Max(col(max_cols[i]))}, {col("i2")}); + auto expected_cols = { + toNullableVec({res[i]}), + toNullableVec({{1}})}; + ASSERT_COLUMNS_EQ_UR(expected_cols, buildAndExecuteMPPTasks(request)); + } + + { + auto request = context + .scan("test_db", "test_table_2") + .aggregation({Max(col(max_cols[i]))}, {col("i1"), col("i2")}); + auto expected_cols = { + toNullableVec({res[i]}), + toNullableVec({{0}}), + toNullableVec({{1}})}; + ASSERT_COLUMNS_EQ_UR(expected_cols, buildAndExecuteMPPTasks(request)); + } + } +} +CATCH + TEST_F(ComputeServerRunner, cancelAggTasks) try { diff --git a/dbms/src/Server/FlashGrpcServerHolder.cpp b/dbms/src/Server/FlashGrpcServerHolder.cpp index 1aeff0a49c6..0ab13e2dd85 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.cpp +++ b/dbms/src/Server/FlashGrpcServerHolder.cpp @@ -11,6 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include diff --git a/dbms/src/TestUtils/ColumnsToTiPBExpr.cpp b/dbms/src/TestUtils/ColumnsToTiPBExpr.cpp index af8c8bed4ba..2c993ec91ea 100644 --- a/dbms/src/TestUtils/ColumnsToTiPBExpr.cpp +++ b/dbms/src/TestUtils/ColumnsToTiPBExpr.cpp @@ -13,13 +13,14 @@ // limitations under the License. #include +#include #include #include #include +#include #include #include - namespace DB { namespace tests diff --git a/dbms/src/TestUtils/MPPTaskTestUtils.cpp b/dbms/src/TestUtils/MPPTaskTestUtils.cpp index 6d7275a19a0..d33ae8e5910 100644 --- a/dbms/src/TestUtils/MPPTaskTestUtils.cpp +++ b/dbms/src/TestUtils/MPPTaskTestUtils.cpp @@ -11,7 +11,12 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include +#include #include +#include #include namespace DB::tests @@ -80,7 +85,7 @@ std::tuple> MPPTaskTestUtils::prepa return {MPPQueryId(properties.query_ts, properties.local_query_id, properties.server_id, properties.start_ts), res}; } -ColumnsWithTypeAndName MPPTaskTestUtils::exeucteMPPTasks(QueryTasks & tasks, const DAGProperties & properties, std::unordered_map & server_config_map) +ColumnsWithTypeAndName MPPTaskTestUtils::executeMPPTasks(QueryTasks & tasks, const DAGProperties & properties, std::unordered_map & server_config_map) { auto res = executeMPPQueryWithMultipleContext(properties, tasks, server_config_map); return readBlocks(res); @@ -145,8 +150,8 @@ ::testing::AssertionResult MPPTaskTestUtils::assertQueryCancelled(const MPPQuery { std::this_thread::sleep_for(seconds); retry_times++; - // Currenly we wait for 10 times to ensure all tasks are cancelled. - if (retry_times > 10) + // Currenly we wait for 20 times to ensure all tasks are cancelled. + if (retry_times > 20) { return ::testing::AssertionFailure() << "Query not cancelled, " << queryInfo(i) << std::endl; } @@ -166,4 +171,15 @@ ::testing::AssertionResult MPPTaskTestUtils::assertQueryActive(const MPPQueryId } return ::testing::AssertionSuccess(); } + +ColumnsWithTypeAndName MPPTaskTestUtils::buildAndExecuteMPPTasks(DAGRequestBuilder builder) +{ + auto properties = DB::tests::getDAGPropertiesForTest(serverNum()); + for (int i = 0; i < TiFlashTestEnv::globalContextSize(); ++i) + TiFlashTestEnv::getGlobalContext(i).setMPPTest(); + auto tasks = (builder).buildMPPTasks(context, properties); + MockComputeServerManager::instance().resetMockMPPServerInfo(serverNum()); + MockComputeServerManager::instance().setMockStorage(context.mockStorage()); + return executeMPPTasks(tasks, properties, MockComputeServerManager::instance().getServerConfigMap()); +} } // namespace DB::tests diff --git a/dbms/src/TestUtils/MPPTaskTestUtils.h b/dbms/src/TestUtils/MPPTaskTestUtils.h index cb0e84a2a14..75330ed0c6d 100644 --- a/dbms/src/TestUtils/MPPTaskTestUtils.h +++ b/dbms/src/TestUtils/MPPTaskTestUtils.h @@ -15,11 +15,7 @@ #pragma once #include -#include -#include -#include #include -#include namespace DB::tests { @@ -50,12 +46,12 @@ class MockServerAddrGenerator : public ext::Singleton void reset() { - port = 3931; + port = 4931; } private: const Int64 port_upper_bound = 65536; - std::atomic port = 3931; + std::atomic port = 4931; }; // Hold MPP test related infomation: @@ -82,12 +78,14 @@ class MPPTaskTestUtils : public ExecutorTest // run mpp tasks which are ready to cancel, the return value is the start_ts of query. std::tuple> prepareMPPStreams(DAGRequestBuilder builder); - ColumnsWithTypeAndName exeucteMPPTasks(QueryTasks & tasks, const DAGProperties & properties, std::unordered_map & server_config_map); + static ColumnsWithTypeAndName executeMPPTasks(QueryTasks & tasks, const DAGProperties & properties, std::unordered_map & server_config_map); + ColumnsWithTypeAndName buildAndExecuteMPPTasks(DAGRequestBuilder builder); ColumnsWithTypeAndName executeCoprocessorTask(std::shared_ptr & dag_request); static ::testing::AssertionResult assertQueryCancelled(const MPPQueryId & query_id); static ::testing::AssertionResult assertQueryActive(const MPPQueryId & query_id); + static String queryInfo(size_t server_id); protected: @@ -101,7 +99,7 @@ class MPPTaskTestUtils : public ExecutorTest { \ TiFlashTestEnv::getGlobalContext().setMPPTest(); \ MockComputeServerManager::instance().setMockStorage(context.mockStorage()); \ - ASSERT_COLUMNS_EQ_UR(exeucteMPPTasks(tasks, properties, MockComputeServerManager::instance().getServerConfigMap()), expected_cols); \ + ASSERT_COLUMNS_EQ_UR(expected_cols, executeMPPTasks(tasks, properties, MockComputeServerManager::instance().getServerConfigMap())); \ } while (0) diff --git a/dbms/src/TestUtils/mockExecutor.cpp b/dbms/src/TestUtils/mockExecutor.cpp index e6c9a82a231..59000185cdf 100644 --- a/dbms/src/TestUtils/mockExecutor.cpp +++ b/dbms/src/TestUtils/mockExecutor.cpp @@ -13,6 +13,17 @@ // limitations under the License. #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include #include @@ -261,18 +272,19 @@ DAGRequestBuilder & DAGRequestBuilder::exchangeSender(tipb::ExchangeType exchang return *this; } -DAGRequestBuilder & DAGRequestBuilder::join(const DAGRequestBuilder & right, - tipb::JoinType tp, - MockAstVec join_cols, - MockAstVec left_conds, - MockAstVec right_conds, - MockAstVec other_conds, - MockAstVec other_eq_conds_from_in) +DAGRequestBuilder & DAGRequestBuilder::join( + const DAGRequestBuilder & right, + tipb::JoinType tp, + MockAstVec join_col_exprs, + MockAstVec left_conds, + MockAstVec right_conds, + MockAstVec other_conds, + MockAstVec other_eq_conds_from_in) { assert(root); assert(right.root); - root = mock::compileJoin(getExecutorIndex(), root, right.root, tp, join_cols, left_conds, right_conds, other_conds, other_eq_conds_from_in); + root = mock::compileJoin(getExecutorIndex(), root, right.root, tp, join_col_exprs, left_conds, right_conds, other_conds, other_eq_conds_from_in); return *this; } @@ -385,6 +397,7 @@ void MockDAGRequestContext::addMockTable(const String & db, const String & table void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns) { + assert(columnInfos.size() == columns.size()); addMockTable(name, columnInfos); addMockTableColumnData(name, columns); } diff --git a/dbms/src/TestUtils/mockExecutor.h b/dbms/src/TestUtils/mockExecutor.h index 11c09caf4cf..14b314d9c20 100644 --- a/dbms/src/TestUtils/mockExecutor.h +++ b/dbms/src/TestUtils/mockExecutor.h @@ -16,6 +16,7 @@ #include #include +#include #include #include #include