From 78fa108a01aa1e2f45bccc5d80ea9038cb1fe18d Mon Sep 17 00:00:00 2001 From: yanweiqi <592838129@qq.com> Date: Tue, 3 Jan 2023 14:56:20 +0800 Subject: [PATCH] Test: Bind delta merge storage with executor ut (#6561) ref pingcap/tiflash#4609 --- dbms/src/Debug/MockComputeServerManager.cpp | 3 +- dbms/src/Debug/MockComputeServerManager.h | 11 +- dbms/src/Debug/MockStorage.cpp | 175 +++++++++++++++++- dbms/src/Debug/MockStorage.h | 71 +++++-- .../Coprocessor/DAGQueryBlockInterpreter.cpp | 21 ++- .../Flash/Coprocessor/InterpreterUtils.cpp | 2 +- .../Flash/Coprocessor/MockSourceStream.cpp | 2 +- dbms/src/Flash/Coprocessor/MockSourceStream.h | 5 +- dbms/src/Flash/FlashService.cpp | 3 +- dbms/src/Flash/FlashService.h | 7 +- dbms/src/Flash/Planner/PhysicalPlan.cpp | 4 +- .../plans/PhysicalMockExchangeReceiver.cpp | 5 +- .../plans/PhysicalMockExchangeReceiver.h | 2 +- .../Planner/plans/PhysicalMockTableScan.cpp | 12 +- .../tests/gtest_aggregation_executor.cpp | 28 --- .../Flash/tests/gtest_executors_with_dm.cpp | 158 ++++++++++++++++ .../src/Flash/tests/gtest_filter_executor.cpp | 4 +- dbms/src/Flash/tests/gtest_limit_executor.cpp | 11 +- dbms/src/Flash/tests/gtest_mock_storage.cpp | 56 ++++++ .../Flash/tests/gtest_projection_executor.cpp | 23 +-- .../gtest_squashing_hash_join_transform.cpp | 4 +- dbms/src/Flash/tests/gtest_topn_executor.cpp | 69 ++++--- dbms/src/Interpreters/Context.cpp | 17 +- dbms/src/Interpreters/Context.h | 12 +- dbms/src/Server/FlashGrpcServerHolder.cpp | 2 +- dbms/src/Server/FlashGrpcServerHolder.h | 3 +- dbms/src/TestUtils/ExecutorTestUtils.cpp | 8 +- dbms/src/TestUtils/ExecutorTestUtils.h | 35 ++++ dbms/src/TestUtils/mockExecutor.cpp | 64 ++++++- dbms/src/TestUtils/mockExecutor.h | 37 +++- 30 files changed, 691 insertions(+), 163 deletions(-) create mode 100644 dbms/src/Flash/tests/gtest_executors_with_dm.cpp create mode 100644 dbms/src/Flash/tests/gtest_mock_storage.cpp diff --git a/dbms/src/Debug/MockComputeServerManager.cpp b/dbms/src/Debug/MockComputeServerManager.cpp index 64b5c6fc7d4..f83e3bba83c 100644 --- a/dbms/src/Debug/MockComputeServerManager.cpp +++ b/dbms/src/Debug/MockComputeServerManager.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include #include +#include #include #include #include @@ -71,7 +72,7 @@ void MockComputeServerManager::startServers(const LoggerPtr & log_ptr, int start prepareMockMPPServerInfo(); } -void MockComputeServerManager::setMockStorage(MockStorage & mock_storage) +void MockComputeServerManager::setMockStorage(MockStorage * mock_storage) { for (const auto & server : server_map) { diff --git a/dbms/src/Debug/MockComputeServerManager.h b/dbms/src/Debug/MockComputeServerManager.h index dd622e00b70..b15af5f8c6c 100644 --- a/dbms/src/Debug/MockComputeServerManager.h +++ b/dbms/src/Debug/MockComputeServerManager.h @@ -14,11 +14,13 @@ #pragma once -#include #include #include -namespace DB::tests +namespace DB +{ +class MockStorage; +namespace tests { /** Hold Mock Compute Server to manage the lifetime of them. * Maintains Mock Compute Server info. @@ -35,7 +37,7 @@ class MockComputeServerManager : public ext::Singleton void startServers(const LoggerPtr & log_ptr, int start_idx); /// set MockStorage for Compute Server in order to mock input columns. - void setMockStorage(MockStorage & mock_storage); + void setMockStorage(MockStorage * mock_storage); /// stop all servers. void reset(); @@ -58,4 +60,5 @@ class MockComputeServerManager : public ext::Singleton std::unordered_map> server_map; std::unordered_map server_config_map; }; -} // namespace DB::tests +} // namespace tests +} // namespace DB diff --git a/dbms/src/Debug/MockStorage.cpp b/dbms/src/Debug/MockStorage.cpp index 7a19da7085b..5f40a34a477 100644 --- a/dbms/src/Debug/MockStorage.cpp +++ b/dbms/src/Debug/MockStorage.cpp @@ -11,11 +11,19 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include +#include +#include +#include +#include +#include +#include -namespace DB::tests +namespace DB { +/// for table scan void MockStorage::addTableSchema(const String & name, const MockColumnInfoVec & columnInfos) { name_to_id_map[name] = MockTableIdGenerator::instance().nextTableId(); @@ -63,6 +71,145 @@ MockColumnInfoVec MockStorage::getTableSchema(const String & name) throw Exception(fmt::format("Failed to get table schema by table name '{}'", name)); } +/// for delta merge +void MockStorage::addTableSchemaForDeltaMerge(const String & name, const MockColumnInfoVec & columnInfos) +{ + name_to_id_map_for_delta_merge[name] = MockTableIdGenerator::instance().nextTableId(); + table_schema_for_delta_merge[getTableIdForDeltaMerge(name)] = columnInfos; + addTableInfoForDeltaMerge(name, columnInfos); +} + +void MockStorage::addTableDataForDeltaMerge(Context & context, const String & name, ColumnsWithTypeAndName & columns) +{ + auto table_id = getTableIdForDeltaMerge(name); + addNamesAndTypesForDeltaMerge(table_id, columns); + if (storage_delta_merge_map.find(table_id) == storage_delta_merge_map.end()) + { + // init + ASTPtr astptr(new ASTIdentifier(name, ASTIdentifier::Kind::Table)); + NamesAndTypesList names_and_types_list; + for (const auto & column : columns) + { + names_and_types_list.emplace_back(column.name, column.type); + } + astptr->children.emplace_back(new ASTIdentifier(columns[0].name)); + + storage_delta_merge_map[table_id] = StorageDeltaMerge::create("TiFlash", + /* db_name= */ "default", + name, + std::nullopt, + ColumnsDescription{names_and_types_list}, + astptr, + 0, + context); + + auto storage = storage_delta_merge_map[table_id]; + assert(storage); + storage->startup(); + + // write data to DeltaMergeStorage + ASTPtr insertptr(new ASTInsertQuery()); + BlockOutputStreamPtr output = storage->write(insertptr, context.getSettingsRef()); + + Block insert_block{columns}; + + output->writePrefix(); + output->write(insert_block); + output->writeSuffix(); + } +} + +BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int64 id) +{ + auto storage = storage_delta_merge_map[id]; + auto column_infos = table_schema_for_delta_merge[id]; + assert(storage); + assert(!column_infos.empty()); + Names column_names; + for (const auto & column_info : column_infos) + column_names.push_back(column_info.first); + + auto scan_context = std::make_shared(); + QueryProcessingStage::Enum stage; + SelectQueryInfo query_info; + query_info.query = std::make_shared(); + query_info.mvcc_query_info = std::make_unique(context.getSettingsRef().resolve_locks, std::numeric_limits::max(), scan_context); + BlockInputStreams ins = storage->read(column_names, query_info, context, stage, 8192, 1); // TODO: Support config max_block_size and num_streams + + BlockInputStreamPtr in = ins[0]; + return in; +} + +void MockStorage::addTableInfoForDeltaMerge(const String & name, const MockColumnInfoVec & columns) +{ + TableInfo table_info; + table_info.name = name; + table_info.id = getTableIdForDeltaMerge(name); + int i = 0; + for (const auto & column : columns) + { + TiDB::ColumnInfo ret; + std::tie(ret.name, ret.tp) = column; + // TODO: find a way to assign decimal field's flen. + if (ret.tp == TiDB::TP::TypeNewDecimal) + ret.flen = 65; + ret.id = i++; + table_info.columns.push_back(std::move(ret)); + } + table_infos_for_delta_merge[name] = table_info; +} + +void MockStorage::addNamesAndTypesForDeltaMerge(Int64 table_id, const ColumnsWithTypeAndName & columns) +{ + NamesAndTypes names_and_types; + for (const auto & column : columns) + { + names_and_types.emplace_back(column.name, column.type); + } + names_and_types_map_for_delta_merge[table_id] = names_and_types; +} + +Int64 MockStorage::getTableIdForDeltaMerge(const String & name) +{ + if (name_to_id_map_for_delta_merge.find(name) != name_to_id_map_for_delta_merge.end()) + { + return name_to_id_map_for_delta_merge[name]; + } + throw Exception(fmt::format("Failed to get table id by table name '{}'", name)); +} + +bool MockStorage::tableExistsForDeltaMerge(Int64 table_id) +{ + return table_schema_for_delta_merge.find(table_id) != table_schema_for_delta_merge.end(); +} + +MockColumnInfoVec MockStorage::getTableSchemaForDeltaMerge(const String & name) +{ + if (tableExistsForDeltaMerge(getTableIdForDeltaMerge(name))) + { + return table_schema_for_delta_merge[getTableIdForDeltaMerge(name)]; + } + throw Exception(fmt::format("Failed to get table schema by table name '{}'", name)); +} + +MockColumnInfoVec MockStorage::getTableSchemaForDeltaMerge(Int64 table_id) +{ + if (tableExistsForDeltaMerge(table_id)) + { + return table_schema_for_delta_merge[table_id]; + } + throw Exception(fmt::format("Failed to get table schema by table id '{}'", table_id)); +} + +NamesAndTypes MockStorage::getNameAndTypesForDeltaMerge(Int64 table_id) +{ + if (tableExistsForDeltaMerge(table_id)) + { + return names_and_types_map_for_delta_merge[table_id]; + } + throw Exception(fmt::format("Failed to get NamesAndTypes by table id '{}'", table_id)); +} + /// for exchange receiver void MockStorage::addExchangeSchema(const String & exchange_name, const MockColumnInfoVec & columnInfos) { @@ -107,6 +254,25 @@ MockColumnInfoVec MockStorage::getExchangeSchema(const String & exchange_name) throw Exception(fmt::format("Failed to get exchange schema by exchange name '{}'", exchange_name)); } +void MockStorage::clear() +{ + for (auto [_, storage] : storage_delta_merge_map) + { + storage->drop(); + storage->removeFromTMTContext(); + } +} + +void MockStorage::setUseDeltaMerge(bool flag) +{ + use_storage_delta_merge = flag; +} + +bool MockStorage::useDeltaMerge() const +{ + return use_storage_delta_merge; +} + // use this function to determine where to cut the columns, // and how many rows are needed for each partition of MPP task. CutColumnInfo getCutColumnInfo(size_t rows, Int64 partition_id, Int64 partition_num) @@ -192,4 +358,9 @@ TableInfo MockStorage::getTableInfo(const String & name) { return table_infos[name]; } -} // namespace DB::tests + +TableInfo MockStorage::getTableInfoForDeltaMerge(const String & name) +{ + return table_infos_for_delta_merge[name]; +} +} // namespace DB diff --git a/dbms/src/Debug/MockStorage.h b/dbms/src/Debug/MockStorage.h index ff5ff0627b3..6e2590e0df0 100644 --- a/dbms/src/Debug/MockStorage.h +++ b/dbms/src/Debug/MockStorage.h @@ -13,14 +13,20 @@ // limitations under the License. #pragma once #include +#include #include #include #include #include +#include #include -namespace DB::tests + +namespace DB { +class StorageDeltaMerge; +class Context; + using MockColumnInfo = std::pair; using MockColumnInfoVec = std::vector; using TableInfo = TiDB::TableInfo; @@ -41,6 +47,7 @@ class MockTableIdGenerator : public ext::Singleton /** Responsible for mock data for executor tests and mpp tests. * 1. Use this class to add mock table schema and table column data. * 2. Use this class to add mock exchange schema and exchange column data. + * 3. Use this class to add table schema and table column data into StorageDeltaMerge. */ class MockStorage { @@ -50,46 +57,88 @@ class MockStorage void addTableData(const String & name, ColumnsWithTypeAndName & columns); - Int64 getTableId(const String & name); + MockColumnInfoVec getTableSchema(const String & name); + + ColumnsWithTypeAndName getColumns(Int64 table_id); bool tableExists(Int64 table_id); - ColumnsWithTypeAndName getColumns(Int64 table_id); + /// for storage delta merge table scan + void addTableSchemaForDeltaMerge(const String & name, const MockColumnInfoVec & columnInfos); - MockColumnInfoVec getTableSchema(const String & name); + void addTableDataForDeltaMerge(Context & context, const String & name, ColumnsWithTypeAndName & columns); + + MockColumnInfoVec getTableSchemaForDeltaMerge(const String & name); + + MockColumnInfoVec getTableSchemaForDeltaMerge(Int64 table_id); + + NamesAndTypes getNameAndTypesForDeltaMerge(Int64 table_id); + + BlockInputStreamPtr getStreamFromDeltaMerge(Context & context, Int64 table_id); + + bool tableExistsForDeltaMerge(Int64 table_id); /// for exchange receiver void addExchangeSchema(const String & exchange_name, const MockColumnInfoVec & columnInfos); void addExchangeData(const String & exchange_name, const ColumnsWithTypeAndName & columns); - bool exchangeExists(const String & executor_id); - bool exchangeExistsWithName(const String & name); - - ColumnsWithTypeAndName getExchangeColumns(const String & executor_id); + MockColumnInfoVec getExchangeSchema(const String & exchange_name); void addExchangeRelation(const String & executor_id, const String & exchange_name); - MockColumnInfoVec getExchangeSchema(const String & exchange_name); + ColumnsWithTypeAndName getExchangeColumns(const String & executor_id); + + bool exchangeExists(const String & executor_id); /// for MPP Tasks, it will split data by partition num, then each MPP service will have a subset of mock data. ColumnsWithTypeAndName getColumnsForMPPTableScan(const TiDBTableScan & table_scan, Int64 partition_id, Int64 partition_num); TableInfo getTableInfo(const String & name); + TableInfo getTableInfoForDeltaMerge(const String & name); + + /// clear for StorageDeltaMerge + void clear(); + + void setUseDeltaMerge(bool flag); + + bool useDeltaMerge() const; private: /// for mock table scan std::unordered_map name_to_id_map; /// std::unordered_map table_schema; /// std::unordered_map table_columns; /// - std::unordered_map table_infos; + std::unordered_map table_infos; /// /// for mock exchange receiver std::unordered_map executor_id_to_name_map; /// std::unordered_map exchange_schemas; /// std::unordered_map exchange_columns; /// + /// for mock storage delta merge + std::unordered_map name_to_id_map_for_delta_merge; /// + std::unordered_map table_schema_for_delta_merge; /// + std::unordered_map> storage_delta_merge_map; // + std::unordered_map table_infos_for_delta_merge; /// + std::unordered_map names_and_types_map_for_delta_merge; /// + + // storage delta merge can be used in executor ut test only. + bool use_storage_delta_merge = false; + private: + /// for table scan + Int64 getTableId(const String & name); + void addTableInfo(const String & name, const MockColumnInfoVec & columns); + + // for storage delta merge table scan + Int64 getTableIdForDeltaMerge(const String & name); + + void addTableInfoForDeltaMerge(const String & name, const MockColumnInfoVec & columns); + + void addNamesAndTypesForDeltaMerge(Int64 table_id, const ColumnsWithTypeAndName & columns); + /// for exchange receiver + bool exchangeExistsWithName(const String & name); }; -} // namespace DB::tests +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 34fd53c3455..2dadb2cd54c 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -163,7 +164,8 @@ AnalysisResult analyzeExpressions( // for tests, we need to mock tableScan blockInputStream as the source stream. void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline) { - if (!context.mockStorage().tableExists(table_scan.getLogicalTableID())) + // Interpreter test will not use columns in MockStorage + if (context.isInterpreterTest()) { auto names_and_types = genNamesAndTypes(table_scan, "mock_table_scan"); auto columns_with_type_and_name = getColumnWithTypeAndName(names_and_types); @@ -174,8 +176,18 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s pipeline.streams.emplace_back(mock_table_scan_stream); } } + else if (context.mockStorage()->useDeltaMerge()) + { + assert(context.mockStorage()->tableExistsForDeltaMerge(table_scan.getLogicalTableID())); + auto names_and_types = context.mockStorage()->getNameAndTypesForDeltaMerge(table_scan.getLogicalTableID()); + auto mock_table_scan_stream = context.mockStorage()->getStreamFromDeltaMerge(context, table_scan.getLogicalTableID()); + analyzer = std::make_unique(std::move(names_and_types), context); + pipeline.streams.push_back(mock_table_scan_stream); + } else { + /// build from user input blocks. + assert(context.mockStorage()->tableExists(table_scan.getLogicalTableID())); NamesAndTypes names_and_types; std::vector> mock_table_scan_streams; if (context.isMPPTest()) @@ -536,7 +548,8 @@ void DAGQueryBlockInterpreter::handleExchangeReceiver(DAGPipeline & pipeline) // for tests, we need to mock ExchangeReceiver blockInputStream as the source stream. void DAGQueryBlockInterpreter::handleMockExchangeReceiver(DAGPipeline & pipeline) { - if (!context.mockStorage().exchangeExists(query_block.source_name)) + // Interpreter test will not use columns in MockStorage + if (context.isInterpreterTest() || !context.mockStorage()->exchangeExists(query_block.source_name)) { for (size_t i = 0; i < max_streams; ++i) { @@ -635,7 +648,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) } else if (query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver) { - if (unlikely(context.isExecutorTest())) + if (unlikely(context.isExecutorTest() || context.isInterpreterTest())) handleMockExchangeReceiver(pipeline); else { @@ -734,7 +747,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) // execute exchange_sender if (query_block.exchange_sender) { - if (unlikely(context.isExecutorTest())) + if (unlikely(context.isExecutorTest() || context.isInterpreterTest())) handleMockExchangeSender(pipeline); else { diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index 899362521cb..67662ad3ad0 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -176,7 +176,7 @@ void executeCreatingSets( { DAGContext & dag_context = *context.getDAGContext(); /// add union to run in parallel if needed - if (unlikely(context.isExecutorTest())) + if (unlikely(context.isExecutorTest() || context.isInterpreterTest())) executeUnion(pipeline, max_streams, log, /*ignore_block=*/false, "for test"); else if (context.isMPPTest()) executeUnion(pipeline, max_streams, log, /*ignore_block=*/true, "for mpp test"); diff --git a/dbms/src/Flash/Coprocessor/MockSourceStream.cpp b/dbms/src/Flash/Coprocessor/MockSourceStream.cpp index c8e662adc32..df770b113aa 100644 --- a/dbms/src/Flash/Coprocessor/MockSourceStream.cpp +++ b/dbms/src/Flash/Coprocessor/MockSourceStream.cpp @@ -18,7 +18,7 @@ namespace DB { std::pair>> mockSourceStreamForMpp(Context & context, size_t max_streams, DB::LoggerPtr log, const TiDBTableScan & table_scan) { - ColumnsWithTypeAndName columns_with_type_and_name = context.mockStorage().getColumnsForMPPTableScan(table_scan, context.mockMPPServerInfo().partition_id, context.mockMPPServerInfo().partition_num); + ColumnsWithTypeAndName columns_with_type_and_name = context.mockStorage()->getColumnsForMPPTableScan(table_scan, context.mockMPPServerInfo().partition_id, context.mockMPPServerInfo().partition_num); return cutStreams(context, columns_with_type_and_name, max_streams, log); } } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/MockSourceStream.h b/dbms/src/Flash/Coprocessor/MockSourceStream.h index c84d37d2a06..ec771a731dc 100644 --- a/dbms/src/Flash/Coprocessor/MockSourceStream.h +++ b/dbms/src/Flash/Coprocessor/MockSourceStream.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -70,9 +71,9 @@ std::pair>> mockSourceStr { ColumnsWithTypeAndName columns_with_type_and_name; if constexpr (std::is_same_v) - columns_with_type_and_name = context.mockStorage().getExchangeColumns(executor_id); + columns_with_type_and_name = context.mockStorage()->getExchangeColumns(executor_id); else - columns_with_type_and_name = context.mockStorage().getColumns(table_id); + columns_with_type_and_name = context.mockStorage()->getColumns(table_id); return cutStreams(context, columns_with_type_and_name, max_streams, log); } diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 57aa23a7e3c..f864a81e789 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -490,7 +491,7 @@ grpc::Status FlashService::Compact(grpc::ServerContext * grpc_context, const kvr return manual_compact_manager->handleRequest(request, response); } -void FlashService::setMockStorage(MockStorage & mock_storage_) +void FlashService::setMockStorage(MockStorage * mock_storage_) { mock_storage = mock_storage_; } diff --git a/dbms/src/Flash/FlashService.h b/dbms/src/Flash/FlashService.h index 9223060db37..c1b38d3d8f5 100644 --- a/dbms/src/Flash/FlashService.h +++ b/dbms/src/Flash/FlashService.h @@ -34,8 +34,7 @@ namespace DB class IServer; class IAsyncCallData; class EstablishCallData; - -using MockStorage = tests::MockStorage; +class MockStorage; using MockMPPServerInfo = tests::MockMPPServerInfo; namespace Management @@ -79,7 +78,7 @@ class FlashService : public tikvpb::Tikv::Service grpc::Status Compact(grpc::ServerContext * grpc_context, const kvrpcpb::CompactRequest * request, kvrpcpb::CompactResponse * response) override; - void setMockStorage(MockStorage & mock_storage_); + void setMockStorage(MockStorage * mock_storage_); void setMockMPPServerInfo(MockMPPServerInfo & mpp_test_info_); Context * getContext() { return context; } @@ -97,7 +96,7 @@ class FlashService : public tikvpb::Tikv::Service std::unique_ptr manual_compact_manager; /// for mpp unit test. - MockStorage mock_storage; + MockStorage * mock_storage = nullptr; MockMPPServerInfo mpp_test_info{}; // Put thread pool member(s) at the end so that ensure it will be destroyed firstly. diff --git a/dbms/src/Flash/Planner/PhysicalPlan.cpp b/dbms/src/Flash/Planner/PhysicalPlan.cpp index df19f325e66..2e23d8fcecf 100644 --- a/dbms/src/Flash/Planner/PhysicalPlan.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlan.cpp @@ -126,7 +126,7 @@ void PhysicalPlan::build(const String & executor_id, const tipb::Executor * exec { GET_METRIC(tiflash_coprocessor_executor_count, type_exchange_sender).Increment(); buildFinalProjection(fmt::format("{}_", executor_id), true); - if (unlikely(context.isExecutorTest())) + if (unlikely(context.isExecutorTest() || context.isInterpreterTest())) pushBack(PhysicalMockExchangeSender::build(executor_id, log, popBack())); else { @@ -139,7 +139,7 @@ void PhysicalPlan::build(const String & executor_id, const tipb::Executor * exec case tipb::ExecType::TypeExchangeReceiver: { GET_METRIC(tiflash_coprocessor_executor_count, type_exchange_receiver).Increment(); - if (unlikely(context.isExecutorTest())) + if (unlikely(context.isExecutorTest() || context.isInterpreterTest())) pushBack(PhysicalMockExchangeReceiver::build(context, executor_id, log, executor->exchange_receiver())); else { diff --git a/dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.cpp b/dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.cpp index 76928e73490..e3d2a379b0a 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.cpp @@ -38,7 +38,8 @@ std::pair mockSchemaAndStreams( size_t max_streams = dag_context.initialize_concurrency; assert(max_streams > 0); - if (!context.mockStorage().exchangeExists(executor_id)) + // Interpreter test will not use columns in MockStorage + if (context.isInterpreterTest() || !context.mockStorage()->exchangeExists(executor_id)) { /// build with default blocks. for (size_t i = 0; i < max_streams; ++i) @@ -81,8 +82,6 @@ PhysicalPlanNodePtr PhysicalMockExchangeReceiver::build( const LoggerPtr & log, const tipb::ExchangeReceiver & exchange_receiver) { - assert(context.isExecutorTest()); - auto [schema, mock_streams] = mockSchemaAndStreams(context, executor_id, log, exchange_receiver); auto physical_mock_exchange_receiver = std::make_shared( diff --git a/dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.h b/dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.h index 4c95babea6b..41b8f4139a3 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.h +++ b/dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.h @@ -23,7 +23,7 @@ namespace DB /** * A physical plan node that generates MockExchangeReceiverInputStream. * Used in gtest to test execution logic. - * Only available with `context.isExecutorTest() == true`. + * Only available with `context.isExecutorTest() == true || context.isInterpreterTest() == true`. */ class PhysicalMockExchangeReceiver : public PhysicalLeaf { diff --git a/dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp b/dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp index 23833b9af0b..6a56715f217 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp @@ -34,22 +34,28 @@ std::pair mockSchemaAndStreams( { NamesAndTypes schema; BlockInputStreams mock_streams; - auto & dag_context = *context.getDAGContext(); size_t max_streams = dag_context.initialize_concurrency; assert(max_streams > 0); - if (!context.mockStorage().tableExists(table_scan.getLogicalTableID())) + // Interpreter test will not use columns in MockStorage + if (context.isInterpreterTest()) { - /// build with default blocks. schema = genNamesAndTypes(table_scan, "mock_table_scan"); auto columns_with_type_and_name = getColumnWithTypeAndName(schema); for (size_t i = 0; i < max_streams; ++i) mock_streams.emplace_back(std::make_shared(columns_with_type_and_name, context.getSettingsRef().max_block_size)); } + else if (context.mockStorage()->useDeltaMerge()) + { + assert(context.mockStorage()->tableExistsForDeltaMerge(table_scan.getLogicalTableID())); + schema = context.mockStorage()->getNameAndTypesForDeltaMerge(table_scan.getLogicalTableID()); + mock_streams.emplace_back(context.mockStorage()->getStreamFromDeltaMerge(context, table_scan.getLogicalTableID())); + } else { /// build from user input blocks. + assert(context.mockStorage()->tableExists(table_scan.getLogicalTableID())); NamesAndTypes names_and_types; std::vector> mock_table_scan_streams; if (context.isMPPTest()) diff --git a/dbms/src/Flash/tests/gtest_aggregation_executor.cpp b/dbms/src/Flash/tests/gtest_aggregation_executor.cpp index e5bd3c692ef..f89990ad381 100644 --- a/dbms/src/Flash/tests/gtest_aggregation_executor.cpp +++ b/dbms/src/Flash/tests/gtest_aggregation_executor.cpp @@ -33,34 +33,6 @@ namespace tests class AggExecutorTestRunner : public ExecutorTest { public: - using ColStringNullableType = std::optional::FieldType>; - using ColInt8NullableType = std::optional::FieldType>; - using ColInt16NullableType = std::optional::FieldType>; - using ColInt32NullableType = std::optional::FieldType>; - using ColInt64NullableType = std::optional::FieldType>; - using ColFloat32NullableType = std::optional::FieldType>; - using ColFloat64NullableType = std::optional::FieldType>; - using ColMyDateNullableType = std::optional::FieldType>; - using ColMyDateTimeNullableType = std::optional::FieldType>; - using ColDecimalNullableType = std::optional::FieldType>; - using ColUInt64Type = typename TypeTraits::FieldType; - using ColFloat64Type = typename TypeTraits::FieldType; - using ColStringType = typename TypeTraits::FieldType; - - using ColumnWithNullableString = std::vector; - using ColumnWithNullableInt8 = std::vector; - using ColumnWithNullableInt16 = std::vector; - using ColumnWithNullableInt32 = std::vector; - using ColumnWithNullableInt64 = std::vector; - using ColumnWithNullableFloat32 = std::vector; - using ColumnWithNullableFloat64 = std::vector; - using ColumnWithNullableMyDate = std::vector; - using ColumnWithNullableMyDateTime = std::vector; - using ColumnWithNullableDecimal = std::vector; - using ColumnWithUInt64 = std::vector; - using ColumnWithFloat64 = std::vector; - using ColumnWithString = std::vector; - ~AggExecutorTestRunner() override = default; void initializeContext() override diff --git a/dbms/src/Flash/tests/gtest_executors_with_dm.cpp b/dbms/src/Flash/tests/gtest_executors_with_dm.cpp new file mode 100644 index 00000000000..a4c802547ac --- /dev/null +++ b/dbms/src/Flash/tests/gtest_executors_with_dm.cpp @@ -0,0 +1,158 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +namespace DB +{ +namespace tests +{ +class ExecutorsWithDMTestRunner : public DB::tests::ExecutorTest +{ +public: + void initializeContext() override + { + ExecutorTest::initializeContext(); + context.mockStorage()->setUseDeltaMerge(true); + // note that + // 1. the first column is pk. + // 2. The decimal type is not supported. + context.addMockDeltaMerge({"test_db", "t0"}, + {{"col0", TiDB::TP::TypeLongLong}}, + {{toVec("col0", {0, 1, 2, 3, 4, 5, 6, 7})}}); + + context.addMockDeltaMerge({"test_db", "t1"}, + {{"col0", TiDB::TP::TypeLongLong}, + {"col1", TiDB::TP::TypeString}}, + {{toVec("col0", {0, 1, 2, 3, 4, 5, 6, 7})}, + {toNullableVec("col1", {"col1-0", "col1-1", "col1-2", {}, "col1-4", {}, "col1-6", "col1-7"})}}); + + context.addMockDeltaMerge({"test_db", "t2"}, + {{"col0", TiDB::TP::TypeLongLong}, + {"col1", TiDB::TP::TypeTiny}, + {"col2", TiDB::TP::TypeShort}, + {"col3", TiDB::TP::TypeLong}, + {"col4", TiDB::TP::TypeLongLong}, + {"col5", TiDB::TP::TypeFloat}, + {"col6", TiDB::TP::TypeDouble}, + {"col7", TiDB::TP::TypeDate}, + {"col8", TiDB::TP::TypeDatetime}, + {"col9", TiDB::TP::TypeString}}, + {toVec("col0", col_id), + toNullableVec("col1", col_tinyint), + toNullableVec("col2", col_smallint), + toNullableVec("col3", col_int), + toNullableVec("col4", col_bigint), + toNullableVec("col5", col_float), + toNullableVec("col6", col_double), + toNullableVec("col7", col_mydate), + toNullableVec("col8", col_mydatetime), + toNullableVec("col9", col_string)}); + } + + void TearDown() override + { + context.mockStorage()->clear(); + } + + ColumnWithInt64 col_id{1, 2, 3, 4, 5, 6, 7, 8, 9}; + ColumnWithNullableInt8 col_tinyint{1, 2, 3, {}, {}, 0, 0, -1, -2}; + ColumnWithNullableInt16 col_smallint{2, 3, {}, {}, 0, -1, -2, 4, 0}; + ColumnWithNullableInt32 col_int{4, {}, {}, 0, 123, -1, -1, 123, 4}; + ColumnWithNullableInt64 col_bigint{2, 2, {}, 0, -1, {}, -1, 0, 123}; + ColumnWithNullableFloat32 col_float{3.3, {}, 0, 4.0, 3.3, 5.6, -0.1, -0.1, {}}; + ColumnWithNullableFloat64 col_double{0.1, 0, 1.1, 1.1, 1.2, {}, {}, -1.2, -1.2}; + ColumnWithNullableMyDate col_mydate{1000000, 2000000, {}, 300000, 1000000, {}, 0, 2000000, {}}; + ColumnWithNullableMyDateTime col_mydatetime{2000000, 0, {}, 3000000, 1000000, {}, 0, 2000000, 1000000}; + ColumnWithNullableString col_string{{}, "pingcap", "PingCAP", {}, "PINGCAP", "PingCAP", {}, "Shanghai", "Shanghai"}; +}; + +TEST_F(ExecutorsWithDMTestRunner, Basic) +try +{ + // table scan + auto request = context + .scan("test_db", "t0") + .build(context); + executeAndAssertColumnsEqual( + request, + {{toNullableVec("col0", {0, 1, 2, 3, 4, 5, 6, 7})}}); + + request = context + .scan("test_db", "t1") + .build(context); + executeAndAssertColumnsEqual( + request, + {{toNullableVec("col0", {0, 1, 2, 3, 4, 5, 6, 7})}, + {toNullableVec("col1", {"col1-0", "col1-1", "col1-2", {}, "col1-4", {}, "col1-6", "col1-7"})}}); + + request = context + .scan("test_db", "t2") + .build(context); + + executeAndAssertColumnsEqual( + request, + {toNullableVec({1, 2, 3, 4, 5, 6, 7, 8, 9}), + toNullableVec(col_tinyint), + toNullableVec(col_smallint), + toNullableVec(col_int), + toNullableVec(col_bigint), + toNullableVec(col_float), + toNullableVec(col_double), + toNullableVec(col_mydate), + toNullableVec(col_mydatetime), + toNullableVec(col_string)}); + + // projection + request = context + .scan("test_db", "t1") + .project({col("col0")}) + .build(context); + executeAndAssertColumnsEqual( + request, + {{toNullableVec("col0", {0, 1, 2, 3, 4, 5, 6, 7})}}); + + request = context + .scan("test_db", "t1") + .project({col("col1")}) + .build(context); + executeAndAssertColumnsEqual( + request, + {{toNullableVec("col1", {"col1-0", "col1-1", "col1-2", {}, "col1-4", {}, "col1-6", "col1-7"})}}); + + // filter + request = context + .scan("test_db", "t0") + .filter(lt(col("col0"), lit(Field(static_cast(4))))) + .build(context); + executeAndAssertColumnsEqual( + request, + {{toNullableVec("col0", {0, 1, 2, 3})}}); + + request = context + .scan("test_db", "t1") + .filter(lt(col("col0"), lit(Field(static_cast(4))))) + .build(context); + executeAndAssertColumnsEqual( + request, + {{toNullableVec("col0", {0, 1, 2, 3})}, + {toNullableVec("col1", {"col1-0", "col1-1", "col1-2", {}})}}); +} +CATCH + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Flash/tests/gtest_filter_executor.cpp b/dbms/src/Flash/tests/gtest_filter_executor.cpp index 96617b3a587..9d28dbd90b3 100644 --- a/dbms/src/Flash/tests/gtest_filter_executor.cpp +++ b/dbms/src/Flash/tests/gtest_filter_executor.cpp @@ -89,7 +89,7 @@ try } CATCH -TEST_F(FilterExecutorTestRunner, and_or) +TEST_F(FilterExecutorTestRunner, andOr) try { auto test_one = [&](const ASTPtr & condition, const ColumnsWithTypeAndName & expect_columns) { @@ -197,7 +197,7 @@ try } CATCH -TEST_F(FilterExecutorTestRunner, convert_bool) +TEST_F(FilterExecutorTestRunner, convertBool) try { { diff --git a/dbms/src/Flash/tests/gtest_limit_executor.cpp b/dbms/src/Flash/tests/gtest_limit_executor.cpp index 049efafc333..00c48d8d779 100644 --- a/dbms/src/Flash/tests/gtest_limit_executor.cpp +++ b/dbms/src/Flash/tests/gtest_limit_executor.cpp @@ -23,9 +23,6 @@ namespace tests class LimitExecutorTestRunner : public DB::tests::ExecutorTest { public: - using ColDataType = std::optional::FieldType>; - using ColumnWithData = std::vector; - void initializeContext() override { ExecutorTest::initializeContext(); @@ -44,7 +41,7 @@ class LimitExecutorTestRunner : public DB::tests::ExecutorTest const String db_name{"test_db"}; const String table_name{"projection_test_table"}; const String col_name{"limit_col"}; - const ColumnWithData col0{"col0-0", {}, "col0-2", "col0-3", {}, "col0-5", "col0-6", "col0-7"}; + const ColumnWithNullableString col0{"col0-0", {}, "col0-2", "col0-3", {}, "col0-5", "col0-6", "col0-7"}; }; TEST_F(LimitExecutorTestRunner, Limit) @@ -64,9 +61,9 @@ try if (limit_num == 0) expect_cols = {}; else if (limit_num > col_data_num) - expect_cols = {toNullableVec(col_name, ColumnWithData(col0.begin(), col0.end()))}; + expect_cols = {toNullableVec(col_name, ColumnWithNullableString(col0.begin(), col0.end()))}; else - expect_cols = {toNullableVec(col_name, ColumnWithData(col0.begin(), col0.begin() + limit_num))}; + expect_cols = {toNullableVec(col_name, ColumnWithNullableString(col0.begin(), col0.begin() + limit_num))}; WRAP_FOR_DIS_ENABLE_PLANNER_BEGIN ASSERT_COLUMNS_EQ_R(executeStreams(request), expect_cols); @@ -81,7 +78,7 @@ TEST_F(LimitExecutorTestRunner, RawQuery) try { String query = "select * from test_db.projection_test_table limit 1"; - auto cols = {toNullableVec(col_name, ColumnWithData(col0.begin(), col0.begin() + 1))}; + auto cols = {toNullableVec(col_name, ColumnWithNullableString(col0.begin(), col0.begin() + 1))}; ASSERT_COLUMNS_EQ_R(executeRawQuery(query, 1), cols); } CATCH diff --git a/dbms/src/Flash/tests/gtest_mock_storage.cpp b/dbms/src/Flash/tests/gtest_mock_storage.cpp new file mode 100644 index 00000000000..1d38b2aefd1 --- /dev/null +++ b/dbms/src/Flash/tests/gtest_mock_storage.cpp @@ -0,0 +1,56 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +namespace DB +{ +namespace tests +{ +class MockStorageTestRunner : public DB::tests::ExecutorTest +{ +public: + void initializeContext() override + { + ExecutorTest::initializeContext(); + } + + // single column table + const ColumnWithNullableString col1{"col1-0", "col1-1", "col1-2", {}, "col1-4", {}, "col1-6", "col1-7"}; + const ColumnWithInt64 col0{0, 1, 2, 3, 4, 5, 6, 7}; + + MockStorage mock_storage; +}; + +TEST_F(MockStorageTestRunner, DeltaMergeStorageBasic) +try +{ + ColumnsWithTypeAndName columns{toVec("col0", col0), toNullableVec("col1", col1)}; + mock_storage.addTableSchemaForDeltaMerge("test", {{"col0", TiDB::TP::TypeLongLong}, {"col1", TiDB::TP::TypeString}}); + mock_storage.addTableDataForDeltaMerge(context.context, "test", columns); + auto in = mock_storage.getStreamFromDeltaMerge(context.context, 1); + + ASSERT_INPUTSTREAM_BLOCK_UR( + in, + Block(columns)); + + mock_storage.clear(); +} +CATCH + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Flash/tests/gtest_projection_executor.cpp b/dbms/src/Flash/tests/gtest_projection_executor.cpp index 25a234ee385..b4d5e4e1aa0 100644 --- a/dbms/src/Flash/tests/gtest_projection_executor.cpp +++ b/dbms/src/Flash/tests/gtest_projection_executor.cpp @@ -23,9 +23,6 @@ namespace tests class ProjectionExecutorTestRunner : public DB::tests::ExecutorTest { public: - using ColDataString = std::vector::FieldType>>; - using ColDataInt32 = std::vector::FieldType>>; - void initializeContext() override { ExecutorTest::initializeContext(); @@ -70,23 +67,23 @@ class ProjectionExecutorTestRunner : public DB::tests::ExecutorTest }; /// Prepare column data - const ColDataString col0{"col0-0", "col0-1", "", "col0-2", {}, "col0-3", ""}; - const ColDataString col1{"col1-0", {}, "", "col1-1", "", "col1-2", "col1-3"}; - const ColDataString col2{"", "col2-0", "col2-1", {}, "col2-3", {}, "col2-4"}; - const ColDataInt32 col3{1, {}, 0, -111111, {}, 0, 9999}; + const ColumnWithNullableString col0{"col0-0", "col0-1", "", "col0-2", {}, "col0-3", ""}; + const ColumnWithNullableString col1{"col1-0", {}, "", "col1-1", "", "col1-2", "col1-3"}; + const ColumnWithNullableString col2{"", "col2-0", "col2-1", {}, "col2-3", {}, "col2-4"}; + const ColumnWithNullableInt32 col3{1, {}, 0, -111111, {}, 0, 9999}; /** Each value in col4 should be different from each other so that topn * could sort the columns into an unique result, or multi-results could * be right. */ - const ColDataInt32 col4{0, 5, -123, -234, {}, 24353, 9999}; + const ColumnWithNullableInt32 col4{0, 5, -123, -234, {}, 24353, 9999}; /// Results after sorted by col4 - const ColDataString col0_sorted_asc{{}, "col0-2", "", "col0-0", "col0-1", "", "col0-3"}; - const ColDataString col1_sorted_asc{"", "col1-1", "", "col1-0", {}, "col1-3", "col1-2"}; - const ColDataString col2_sorted_asc{"col2-3", {}, "col2-1", "", "col2-0", "col2-4", {}}; - const ColDataInt32 col3_sorted_asc{{}, -111111, 0, 1, {}, 9999, 0}; - const ColDataInt32 col4_sorted_asc{{}, -234, -123, 0, 5, 9999, 24353}; + const ColumnWithNullableString col0_sorted_asc{{}, "col0-2", "", "col0-0", "col0-1", "", "col0-3"}; + const ColumnWithNullableString col1_sorted_asc{"", "col1-1", "", "col1-0", {}, "col1-3", "col1-2"}; + const ColumnWithNullableString col2_sorted_asc{"col2-3", {}, "col2-1", "", "col2-0", "col2-4", {}}; + const ColumnWithNullableInt32 col3_sorted_asc{{}, -111111, 0, 1, {}, 9999, 0}; + const ColumnWithNullableInt32 col4_sorted_asc{{}, -234, -123, 0, 5, 9999, 24353}; /// Prepare some names std::vector col_names{"col0", "col1", "col2", "col3", "col4"}; diff --git a/dbms/src/Flash/tests/gtest_squashing_hash_join_transform.cpp b/dbms/src/Flash/tests/gtest_squashing_hash_join_transform.cpp index 1f61878da48..87fa98558bb 100644 --- a/dbms/src/Flash/tests/gtest_squashing_hash_join_transform.cpp +++ b/dbms/src/Flash/tests/gtest_squashing_hash_join_transform.cpp @@ -33,9 +33,9 @@ class SquashingHashJoinBlockTransformTest : public ::testing::Test static void check(Blocks blocks, UInt64 max_block_size) { - for (size_t i = 0; i < blocks.size(); ++i) + for (auto & block : blocks) { - ASSERT(blocks[i].rows() <= max_block_size); + ASSERT(block.rows() <= max_block_size); } } }; diff --git a/dbms/src/Flash/tests/gtest_topn_executor.cpp b/dbms/src/Flash/tests/gtest_topn_executor.cpp index 3cd848643ba..88b8b40f6fc 100644 --- a/dbms/src/Flash/tests/gtest_topn_executor.cpp +++ b/dbms/src/Flash/tests/gtest_topn_executor.cpp @@ -23,11 +23,6 @@ namespace tests class TopNExecutorTestRunner : public DB::tests::ExecutorTest { public: - using ColStringType = std::optional::FieldType>; - using ColInt32Type = std::optional::FieldType>; - using ColumnWithString = std::vector; - using ColumnWithInt32 = std::vector; - void initializeContext() override { ExecutorTest::initializeContext(); @@ -97,14 +92,14 @@ class TopNExecutorTestRunner : public DB::tests::ExecutorTest const String table_single_name{"topn_single_table"}; /// For single column test const String single_col_name{"single_col"}; - ColumnWithString col0{"col0-0", "col0-1", "col0-2", {}, "col0-4", {}, "col0-6", "col0-7"}; + ColumnWithNullableString col0{"col0-0", "col0-1", "col0-2", {}, "col0-4", {}, "col0-6", "col0-7"}; const String table_name{"clerk"}; const std::vector col_name{"age", "gender", "country", "salary"}; - ColumnWithInt32 col_age{{}, 27, 32, 36, {}, 34}; - ColumnWithString col_gender{"female", "female", "male", "female", "male", "male"}; - ColumnWithString col_country{"korea", "usa", "usa", "china", "china", "china"}; - ColumnWithInt32 col_salary{1300, 0, {}, 900, {}, -300}; + ColumnWithNullableInt32 col_age{{}, 27, 32, 36, {}, 34}; + ColumnWithNullableString col_gender{"female", "female", "male", "female", "male", "male"}; + ColumnWithNullableString col_country{"korea", "usa", "usa", "china", "china", "china"}; + ColumnWithNullableInt32 col_salary{1300, 0, {}, 900, {}, -300}; }; TEST_F(TopNExecutorTestRunner, TopN) @@ -121,7 +116,7 @@ try bool is_desc; is_desc = static_cast(i); /// Set descent or ascent if (is_desc) - sort(col0.begin(), col0.end(), std::greater()); /// Sort col0 for the following comparison + sort(col0.begin(), col0.end(), std::greater()); /// Sort col0 for the following comparison else sort(col0.begin(), col0.end()); @@ -131,9 +126,9 @@ try expect_cols.clear(); if (limit_num == 0 || limit_num > col_data_num) - expect_cols.push_back({toNullableVec(single_col_name, ColumnWithString(col0.begin(), col0.end()))}); + expect_cols.push_back({toNullableVec(single_col_name, ColumnWithNullableString(col0.begin(), col0.end()))}); else - expect_cols.push_back({toNullableVec(single_col_name, ColumnWithString(col0.begin(), col0.begin() + limit_num))}); + expect_cols.push_back({toNullableVec(single_col_name, ColumnWithNullableString(col0.begin(), col0.begin() + limit_num))}); executeAndAssertColumnsEqual(request, expect_cols.back()); } @@ -142,18 +137,18 @@ try { /// Test multi-columns - expect_cols = {{toNullableVec(col_name[0], ColumnWithInt32{36, 34, 32, 27, {}, {}}), - toNullableVec(col_name[1], ColumnWithString{"female", "male", "male", "female", "male", "female"}), - toNullableVec(col_name[2], ColumnWithString{"china", "china", "usa", "usa", "china", "korea"}), - toNullableVec(col_name[3], ColumnWithInt32{900, -300, {}, 0, {}, 1300})}, - {toNullableVec(col_name[0], ColumnWithInt32{32, {}, 34, 27, 36, {}}), - toNullableVec(col_name[1], ColumnWithString{"male", "male", "male", "female", "female", "female"}), - toNullableVec(col_name[2], ColumnWithString{"usa", "china", "china", "usa", "china", "korea"}), - toNullableVec(col_name[3], ColumnWithInt32{{}, {}, -300, 0, 900, 1300})}, - {toNullableVec(col_name[0], ColumnWithInt32{34, {}, 32, 36, {}, 27}), - toNullableVec(col_name[1], ColumnWithString{"male", "male", "male", "female", "female", "female"}), - toNullableVec(col_name[2], ColumnWithString{"china", "china", "usa", "china", "korea", "usa"}), - toNullableVec(col_name[3], ColumnWithInt32{-300, {}, {}, 900, 1300, 0})}}; + expect_cols = {{toNullableVec(col_name[0], ColumnWithNullableInt32{36, 34, 32, 27, {}, {}}), + toNullableVec(col_name[1], ColumnWithNullableString{"female", "male", "male", "female", "male", "female"}), + toNullableVec(col_name[2], ColumnWithNullableString{"china", "china", "usa", "usa", "china", "korea"}), + toNullableVec(col_name[3], ColumnWithNullableInt32{900, -300, {}, 0, {}, 1300})}, + {toNullableVec(col_name[0], ColumnWithNullableInt32{32, {}, 34, 27, 36, {}}), + toNullableVec(col_name[1], ColumnWithNullableString{"male", "male", "male", "female", "female", "female"}), + toNullableVec(col_name[2], ColumnWithNullableString{"usa", "china", "china", "usa", "china", "korea"}), + toNullableVec(col_name[3], ColumnWithNullableInt32{{}, {}, -300, 0, 900, 1300})}, + {toNullableVec(col_name[0], ColumnWithNullableInt32{34, {}, 32, 36, {}, 27}), + toNullableVec(col_name[1], ColumnWithNullableString{"male", "male", "male", "female", "female", "female"}), + toNullableVec(col_name[2], ColumnWithNullableString{"china", "china", "usa", "china", "korea", "usa"}), + toNullableVec(col_name[3], ColumnWithNullableInt32{-300, {}, {}, 900, 1300, 0})}}; std::vector order_by_items{ /// select * from clerk order by age DESC, gender DESC; @@ -190,10 +185,10 @@ try { /// "and" function - expect_cols = {{toNullableVec(col_name[0], ColumnWithInt32{{}, {}, 32, 27, 36, 34}), - toNullableVec(col_name[1], ColumnWithString{"female", "male", "male", "female", "female", "male"}), - toNullableVec(col_name[2], ColumnWithString{"korea", "china", "usa", "usa", "china", "china"}), - toNullableVec(col_name[3], ColumnWithInt32{1300, {}, {}, 0, 900, -300})}}; + expect_cols = {{toNullableVec(col_name[0], ColumnWithNullableInt32{{}, {}, 32, 27, 36, 34}), + toNullableVec(col_name[1], ColumnWithNullableString{"female", "male", "male", "female", "female", "male"}), + toNullableVec(col_name[2], ColumnWithNullableString{"korea", "china", "usa", "usa", "china", "china"}), + toNullableVec(col_name[3], ColumnWithNullableInt32{1300, {}, {}, 0, 900, -300})}}; { /// select * from clerk order by age and salary ASC limit 100; @@ -208,10 +203,10 @@ try { /// "equal" function - expect_cols = {{toNullableVec(col_name[0], ColumnWithInt32{27, 36, 34, 32, {}, {}}), - toNullableVec(col_name[1], ColumnWithString{"female", "female", "male", "male", "female", "male"}), - toNullableVec(col_name[2], ColumnWithString{"usa", "china", "china", "usa", "korea", "china"}), - toNullableVec(col_name[3], ColumnWithInt32{0, 900, -300, {}, 1300, {}})}}; + expect_cols = {{toNullableVec(col_name[0], ColumnWithNullableInt32{27, 36, 34, 32, {}, {}}), + toNullableVec(col_name[1], ColumnWithNullableString{"female", "female", "male", "male", "female", "male"}), + toNullableVec(col_name[2], ColumnWithNullableString{"usa", "china", "china", "usa", "korea", "china"}), + toNullableVec(col_name[3], ColumnWithNullableInt32{0, 900, -300, {}, 1300, {}})}}; { /// select age, salary from clerk order by age = salary DESC limit 100; @@ -226,10 +221,10 @@ try { /// "greater" function - expect_cols = {{toNullableVec(col_name[0], ColumnWithInt32{{}, 32, {}, 36, 27, 34}), - toNullableVec(col_name[1], ColumnWithString{"female", "male", "male", "female", "female", "male"}), - toNullableVec(col_name[2], ColumnWithString{"korea", "usa", "china", "china", "usa", "china"}), - toNullableVec(col_name[3], ColumnWithInt32{1300, {}, {}, 900, 0, -300})}}; + expect_cols = {{toNullableVec(col_name[0], ColumnWithNullableInt32{{}, 32, {}, 36, 27, 34}), + toNullableVec(col_name[1], ColumnWithNullableString{"female", "male", "male", "female", "female", "male"}), + toNullableVec(col_name[2], ColumnWithNullableString{"korea", "usa", "china", "china", "usa", "china"}), + toNullableVec(col_name[3], ColumnWithNullableInt32{1300, {}, {}, 900, 0, -300})}}; { /// select age, gender, country, salary from clerk order by age > salary ASC limit 100; diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 868de593933..d55b41d6187 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -1831,7 +1832,7 @@ size_t Context::getMaxStreams() const bool is_cop_request = false; if (dag_context != nullptr) { - if (isExecutorTest()) + if (isExecutorTest() || isInterpreterTest()) max_streams = dag_context->initialize_concurrency; else if (!dag_context->isBatchCop() && !dag_context->isMPPTask()) { @@ -1879,6 +1880,16 @@ void Context::setExecutorTest() test_mode = executor_test; } +bool Context::isInterpreterTest() const +{ + return test_mode == interpreter_test; +} + +void Context::setInterpreterTest() +{ + test_mode = interpreter_test; +} + bool Context::isCopTest() const { return test_mode == cop_test; @@ -1894,12 +1905,12 @@ bool Context::isTest() const return test_mode != non_test; } -void Context::setMockStorage(MockStorage & mock_storage_) +void Context::setMockStorage(MockStorage * mock_storage_) { mock_storage = mock_storage_; } -MockStorage Context::mockStorage() const +MockStorage * Context::mockStorage() const { return mock_storage; } diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 9625bff7dc1..8c4d625955b 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -18,7 +18,6 @@ #include #include #include -#include #include #include #include @@ -99,9 +98,9 @@ using WriteLimiterPtr = std::shared_ptr; class ReadLimiter; using ReadLimiterPtr = std::shared_ptr; using MockMPPServerInfo = DB::tests::MockMPPServerInfo; -using MockStorage = DB::tests::MockStorage; class TiFlashSecurityConfig; using TiFlashSecurityConfigPtr = std::shared_ptr; +class MockStorage; enum class PageStorageRunMode : UInt8; namespace DM @@ -164,12 +163,13 @@ class Context non_test, mpp_test, cop_test, + interpreter_test, executor_test, cancel_test }; TestMode test_mode = non_test; - MockStorage mock_storage; + MockStorage * mock_storage = nullptr; MockMPPServerInfo mpp_server_info{}; TimezoneInfo timezone_info; @@ -483,12 +483,14 @@ class Context void setCancelTest(); bool isExecutorTest() const; void setExecutorTest(); + bool isInterpreterTest() const; + void setInterpreterTest(); void setCopTest(); bool isCopTest() const; bool isTest() const; - void setMockStorage(MockStorage & mock_storage_); - MockStorage mockStorage() const; + void setMockStorage(MockStorage * mock_storage_); + MockStorage * mockStorage() const; MockMPPServerInfo mockMPPServerInfo() const; void setMockMPPServerInfo(MockMPPServerInfo & info); diff --git a/dbms/src/Server/FlashGrpcServerHolder.cpp b/dbms/src/Server/FlashGrpcServerHolder.cpp index 0ab13e2dd85..e983e879b61 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.cpp +++ b/dbms/src/Server/FlashGrpcServerHolder.cpp @@ -245,7 +245,7 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder() } } -void FlashGrpcServerHolder::setMockStorage(MockStorage & mock_storage) +void FlashGrpcServerHolder::setMockStorage(MockStorage * mock_storage) { flash_service->setMockStorage(mock_storage); } diff --git a/dbms/src/Server/FlashGrpcServerHolder.h b/dbms/src/Server/FlashGrpcServerHolder.h index 09eea8cab97..7dbe311cfa1 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.h +++ b/dbms/src/Server/FlashGrpcServerHolder.h @@ -27,7 +27,6 @@ namespace DB { -using MockStorage = tests::MockStorage; using MockMPPServerInfo = tests::MockMPPServerInfo; class FlashGrpcServerHolder @@ -40,7 +39,7 @@ class FlashGrpcServerHolder const LoggerPtr & log_); ~FlashGrpcServerHolder(); - void setMockStorage(MockStorage & mock_storage); + void setMockStorage(MockStorage * mock_storage); void setMockMPPServerInfo(MockMPPServerInfo info); std::unique_ptr & flashService(); diff --git a/dbms/src/TestUtils/ExecutorTestUtils.cpp b/dbms/src/TestUtils/ExecutorTestUtils.cpp index 7ca140ee650..26563a68dea 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.cpp +++ b/dbms/src/TestUtils/ExecutorTestUtils.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -62,6 +63,7 @@ void ExecutorTest::initializeContext() { dag_context_ptr = std::make_unique(1024); context = MockDAGRequestContext(TiFlashTestEnv::getContext()); + context.initMockStorage(); dag_context_ptr->log = Logger::get("executorTest"); TiFlashTestEnv::getGlobalContext().setExecutorTest(); } @@ -96,8 +98,8 @@ void ExecutorTest::executeInterpreter(const String & expected_string, const std: { DAGContext dag_context(*request, "interpreter_test", concurrency); context.context.setDAGContext(&dag_context); - context.context.setExecutorTest(); - // Currently, don't care about regions information in interpreter tests. + context.context.setInterpreterTest(); + // Don't care regions information in interpreter tests. auto query_executor = queryExecute(context.context, /*internal=*/true); ASSERT_EQ(Poco::trim(expected_string), Poco::trim(query_executor->dump())); } @@ -236,7 +238,7 @@ DB::ColumnsWithTypeAndName ExecutorTest::executeRawQuery(const String & query, s context.context, query, [&](const String & database_name, const String & table_name) { - return context.mockStorage().getTableInfo(database_name + "." + table_name); + return context.mockStorage()->getTableInfo(database_name + "." + table_name); }, properties); diff --git a/dbms/src/TestUtils/ExecutorTestUtils.h b/dbms/src/TestUtils/ExecutorTestUtils.h index 79c279f2822..ac0f05b3bef 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.h +++ b/dbms/src/TestUtils/ExecutorTestUtils.h @@ -115,4 +115,39 @@ class ExecutorTest : public ::testing::Test #define ASSERT_DAGREQUEST_EQAUL(str, request) dagRequestEqual((str), (request)); #define ASSERT_BLOCKINPUTSTREAM_EQAUL(str, request, concurrency) executeInterpreter((str), (request), (concurrency)) +// nullable type +using ColStringNullableType = std::optional::FieldType>; +using ColInt8NullableType = std::optional::FieldType>; +using ColInt16NullableType = std::optional::FieldType>; +using ColInt32NullableType = std::optional::FieldType>; +using ColInt64NullableType = std::optional::FieldType>; +using ColFloat32NullableType = std::optional::FieldType>; +using ColFloat64NullableType = std::optional::FieldType>; +using ColMyDateNullableType = std::optional::FieldType>; +using ColMyDateTimeNullableType = std::optional::FieldType>; +using ColDecimalNullableType = std::optional::FieldType>; + +// non nullable type +using ColUInt64Type = typename TypeTraits::FieldType; +using ColInt64Type = typename TypeTraits::FieldType; +using ColFloat64Type = typename TypeTraits::FieldType; +using ColStringType = typename TypeTraits::FieldType; + +// nullable column +using ColumnWithNullableString = std::vector; +using ColumnWithNullableInt8 = std::vector; +using ColumnWithNullableInt16 = std::vector; +using ColumnWithNullableInt32 = std::vector; +using ColumnWithNullableInt64 = std::vector; +using ColumnWithNullableFloat32 = std::vector; +using ColumnWithNullableFloat64 = std::vector; +using ColumnWithNullableMyDate = std::vector; +using ColumnWithNullableMyDateTime = std::vector; +using ColumnWithNullableDecimal = std::vector; + +// non nullable column +using ColumnWithInt64 = std::vector; +using ColumnWithUInt64 = std::vector; +using ColumnWithFloat64 = std::vector; +using ColumnWithString = std::vector; } // namespace DB::tests diff --git a/dbms/src/TestUtils/mockExecutor.cpp b/dbms/src/TestUtils/mockExecutor.cpp index 66fabb59fbd..78762d538c6 100644 --- a/dbms/src/TestUtils/mockExecutor.cpp +++ b/dbms/src/TestUtils/mockExecutor.cpp @@ -37,6 +37,7 @@ #include #include +#include #include namespace DB::tests @@ -361,36 +362,45 @@ DAGRequestBuilder & DAGRequestBuilder::sort(MockOrderByItemVec order_by_vec, boo void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfoVec & columnInfos) { - mock_storage.addTableSchema(db + "." + table, columnInfos); + mock_storage->addTableSchema(db + "." + table, columnInfos); } void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos) { - mock_storage.addTableSchema(name.first + "." + name.second, columnInfos); + mock_storage->addTableSchema(name.first + "." + name.second, columnInfos); } void MockDAGRequestContext::addExchangeRelationSchema(String name, const MockColumnInfoVec & columnInfos) { - mock_storage.addExchangeSchema(name, columnInfos); + mock_storage->addExchangeSchema(name, columnInfos); } void MockDAGRequestContext::addMockTableColumnData(const String & db, const String & table, ColumnsWithTypeAndName columns) { - mock_storage.addTableData(db + "." + table, columns); + mock_storage->addTableData(db + "." + table, columns); } void MockDAGRequestContext::addMockTableColumnData(const MockTableName & name, ColumnsWithTypeAndName columns) { - mock_storage.addTableData(name.first + "." + name.second, columns); + mock_storage->addTableData(name.first + "." + name.second, columns); +} + +void MockDAGRequestContext::addMockDeltaMergeData(const String & db, const String & table, ColumnsWithTypeAndName columns) +{ + for (const auto & column : columns) + RUNTIME_ASSERT(!column.name.empty(), "mock column must have column name"); + + mock_storage->addTableDataForDeltaMerge(context, db + "." + table, columns); } void MockDAGRequestContext::addExchangeReceiverColumnData(const String & name, ColumnsWithTypeAndName columns) { - mock_storage.addExchangeData(name, columns); + mock_storage->addExchangeData(name, columns); } void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns) { + assert(columnInfos.size() == columns.size()); addMockTable(db, table, columnInfos); addMockTableColumnData(db, table, columns); } @@ -402,23 +412,57 @@ void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockC addMockTableColumnData(name, columns); } +void MockDAGRequestContext::addMockDeltaMergeSchema(const String & db, const String & table, const MockColumnInfoVec & columnInfos) +{ + mock_storage->addTableSchemaForDeltaMerge(db + "." + table, columnInfos); +} + +void MockDAGRequestContext::addMockDeltaMerge(const String & db, const String & table, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns) +{ + assert(columnInfos.size() == columns.size()); + assert(context.mockStorage()->useDeltaMerge()); + addMockDeltaMergeSchema(db, table, columnInfos); + addMockDeltaMergeData(db, table, columns); +} + +void MockDAGRequestContext::addMockDeltaMerge(const MockTableName & name, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns) +{ + assert(columnInfos.size() == columns.size()); + addMockDeltaMergeSchema(name.first, name.second, columnInfos); + addMockDeltaMergeData(name.first, name.second, columns); +} + void MockDAGRequestContext::addExchangeReceiver(const String & name, MockColumnInfoVec columnInfos, ColumnsWithTypeAndName columns) { + assert(columnInfos.size() == columns.size()); addExchangeRelationSchema(name, columnInfos); addExchangeReceiverColumnData(name, columns); } DAGRequestBuilder MockDAGRequestContext::scan(const String & db_name, const String & table_name) { - auto table_info = mock_storage.getTableInfo(db_name + "." + table_name); - return DAGRequestBuilder(index, collation).mockTable({db_name, table_name}, table_info, mock_storage.getTableSchema(db_name + "." + table_name)); + if (!mock_storage->useDeltaMerge()) + { + auto table_info = mock_storage->getTableInfo(db_name + "." + table_name); + return DAGRequestBuilder(index, collation).mockTable({db_name, table_name}, table_info, mock_storage->getTableSchema(db_name + "." + table_name)); + } + else + { + auto table_info = mock_storage->getTableInfoForDeltaMerge(db_name + "." + table_name); + return DAGRequestBuilder(index, collation).mockTable({db_name, table_name}, table_info, mock_storage->getTableSchemaForDeltaMerge(db_name + "." + table_name)); + } } DAGRequestBuilder MockDAGRequestContext::receive(const String & exchange_name, uint64_t fine_grained_shuffle_stream_count) { - auto builder = DAGRequestBuilder(index, collation).exchangeReceiver(mock_storage.getExchangeSchema(exchange_name), fine_grained_shuffle_stream_count); + auto builder = DAGRequestBuilder(index, collation).exchangeReceiver(mock_storage->getExchangeSchema(exchange_name), fine_grained_shuffle_stream_count); receiver_source_task_ids_map[builder.getRoot()->name] = {}; - mock_storage.addExchangeRelation(builder.getRoot()->name, exchange_name); + mock_storage->addExchangeRelation(builder.getRoot()->name, exchange_name); return builder; } + +void MockDAGRequestContext::initMockStorage() +{ + mock_storage = std::make_unique(); +} } // namespace DB::tests diff --git a/dbms/src/TestUtils/mockExecutor.h b/dbms/src/TestUtils/mockExecutor.h index 8c9b2697ee3..afc4d7a3bf0 100644 --- a/dbms/src/TestUtils/mockExecutor.h +++ b/dbms/src/TestUtils/mockExecutor.h @@ -24,7 +24,11 @@ #include #include -namespace DB::tests +#include + +namespace DB +{ +namespace tests { using MockColumnInfo = std::pair; using MockColumnInfoVec = std::vector; @@ -161,24 +165,34 @@ class MockDAGRequestContext { public: explicit MockDAGRequestContext(Context context_, Int32 collation_ = TiDB::ITiDBCollator::UTF8MB4_BIN) - : context(context_) + : index(0) + , context(context_) , collation(-abs(collation_)) { - index = 0; } DAGRequestBuilder createDAGRequestBuilder() { return DAGRequestBuilder(index); } - + /// mock column table scan void addMockTable(const String & db, const String & table, const MockColumnInfoVec & columnInfos); void addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos); - void addExchangeRelationSchema(String name, const MockColumnInfoVec & columnInfos); void addMockTableColumnData(const String & db, const String & table, ColumnsWithTypeAndName columns); + void addMockTableColumnData(const MockTableName & name, ColumnsWithTypeAndName columns); + void addMockTable(const String & db, const String & table, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns); void addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns); - void addMockTableColumnData(const MockTableName & name, ColumnsWithTypeAndName columns); + + /// mock DeltaMerge table scan + void addMockDeltaMerge(const MockTableName & name, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns); + void addMockDeltaMerge(const String & db, const String & table, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns); + + void addMockDeltaMergeSchema(const String & db, const String & table, const MockColumnInfoVec & columnInfos); + void addMockDeltaMergeData(const String & db, const String & table, ColumnsWithTypeAndName columns); + + /// mock column exchange receiver + void addExchangeRelationSchema(String name, const MockColumnInfoVec & columnInfos); void addExchangeReceiverColumnData(const String & name, ColumnsWithTypeAndName columns); void addExchangeReceiver(const String & name, MockColumnInfoVec columnInfos, ColumnsWithTypeAndName columns); @@ -188,11 +202,12 @@ class MockDAGRequestContext void setCollation(Int32 collation_) { collation = convertToTiDBCollation(collation_); } Int32 getCollation() const { return abs(collation); } - MockStorage & mockStorage() { return mock_storage; } + MockStorage * mockStorage() { return mock_storage.get(); } + void initMockStorage(); private: size_t index; - MockStorage mock_storage; + std::unique_ptr mock_storage = nullptr; public: // Currently don't support task_id, so the following to structure is useless, @@ -212,6 +227,8 @@ MockWindowFrame buildDefaultRowsFrame(); #define col(name) buildColumn((name)) #define lit(field) buildLiteral((field)) + +// expressions #define concat(expr1, expr2) makeASTFunction("concat", (expr1), (expr2)) #define plusInt(expr1, expr2) makeASTFunction("plusint", (expr1), (expr2)) #define minusInt(expr1, expr2) makeASTFunction("minusint", (expr1), (expr2)) @@ -239,5 +256,5 @@ MockWindowFrame buildDefaultRowsFrame(); #define Lag1(expr) makeASTFunction("Lag", (expr)) #define Lag2(expr1, expr2) makeASTFunction("Lag", (expr1), (expr2)) #define Lag3(expr1, expr2, expr3) makeASTFunction("Lag", (expr1), (expr2), (expr3)) - -} // namespace DB::tests +} // namespace tests +} // namespace DB