From 01fd0126815fc340c2b2534b9e07f0d8d20502dc Mon Sep 17 00:00:00 2001 From: jinhelin Date: Thu, 4 Jul 2024 18:34:39 +0800 Subject: [PATCH] Storages: Fix the issue of obtaining incorrect column information when there are virtual columns in the query --- dbms/src/DataStreams/RuntimeFilter.cpp | 11 ++- dbms/src/DataStreams/RuntimeFilter.h | 6 +- dbms/src/Debug/MockStorage.cpp | 17 +++- dbms/src/Debug/MockStorage.h | 2 + .../Schedule/Tasks/Impls/RFWaitTask.h | 2 +- .../Planner/Plans/PhysicalMockTableScan.cpp | 19 +++- .../Planner/Plans/PhysicalMockTableScan.h | 3 + .../Storages/DeltaMerge/ColumnDefine_fwd.h | 7 ++ .../DeltaMerge/Filter/PushDownFilter.cpp | 14 ++- .../Storages/DeltaMerge/Filter/RSOperator.cpp | 9 +- .../Storages/DeltaMerge/Filter/RSOperator.h | 2 +- .../DeltaMerge/FilterParser/FilterParser.cpp | 60 ++++++++---- .../DeltaMerge/FilterParser/FilterParser.h | 9 +- .../ReadThread/UnorderedInputStream.cpp | 2 +- .../tests/gtest_dm_minmax_index.cpp | 9 +- dbms/src/Storages/StorageDeltaMerge.cpp | 9 +- dbms/src/Storages/StorageDeltaMerge.h | 4 + .../Storages/StorageDisaggregatedRemote.cpp | 2 +- .../Storages/tests/gtest_filter_parser.cpp | 2 +- .../tests/gtests_parse_push_down_filter.cpp | 2 +- dbms/src/TiDB/Schema/TiDB.h | 27 ++--- .../expr/generated_columns.test | 98 +++++++++++++++++++ .../expr/generated_columns2.test | 49 ++++++++++ tests/fullstack-test/expr/runtime_filter.test | 46 +++++++++ 24 files changed, 346 insertions(+), 65 deletions(-) create mode 100644 tests/fullstack-test/expr/generated_columns.test create mode 100644 tests/fullstack-test/expr/generated_columns2.test create mode 100644 tests/fullstack-test/expr/runtime_filter.test diff --git a/dbms/src/DataStreams/RuntimeFilter.cpp b/dbms/src/DataStreams/RuntimeFilter.cpp index 85834f505e7..313dae88433 100644 --- a/dbms/src/DataStreams/RuntimeFilter.cpp +++ b/dbms/src/DataStreams/RuntimeFilter.cpp @@ -206,7 +206,14 @@ bool RuntimeFilter::updateStatus(RuntimeFilterStatus status_, const std::string return true; } -DM::RSOperatorPtr RuntimeFilter::parseToRSOperator(DM::ColumnDefines & columns_to_read) +void RuntimeFilter::setTargetAttr( + const DM::ColumnInfos & scan_column_infos, + const DM::ColumnDefines & table_column_defines) +{ + target_attr = DM::FilterParser::createAttr(target_expr, scan_column_infos, table_column_defines); +} + +DM::RSOperatorPtr RuntimeFilter::parseToRSOperator() { switch (rf_type) { @@ -216,7 +223,7 @@ DM::RSOperatorPtr RuntimeFilter::parseToRSOperator(DM::ColumnDefines & columns_t return DM::FilterParser::parseRFInExpr( rf_type, target_expr, - columns_to_read, + target_attr, in_values_set->getUniqueSetElements(), timezone_info); case tipb::MIN_MAX: diff --git a/dbms/src/DataStreams/RuntimeFilter.h b/dbms/src/DataStreams/RuntimeFilter.h index 8a8ab3a8c96..d2887af13da 100644 --- a/dbms/src/DataStreams/RuntimeFilter.h +++ b/dbms/src/DataStreams/RuntimeFilter.h @@ -47,7 +47,7 @@ class RuntimeFilter } source_expr = rf_pb.source_expr_list().Get(0); target_expr = rf_pb.target_expr_list().Get(0); - }; + } std::string getSourceColumnName() const; @@ -77,7 +77,8 @@ class RuntimeFilter bool await(int64_t ms_remaining); - DM::RSOperatorPtr parseToRSOperator(DM::ColumnDefines & columns_to_read); + void setTargetAttr(const DM::ColumnInfos & scan_column_infos, const DM::ColumnDefines & table_column_defines); + DM::RSOperatorPtr parseToRSOperator(); const int id; @@ -86,6 +87,7 @@ class RuntimeFilter tipb::Expr source_expr; tipb::Expr target_expr; + std::optional target_attr; const tipb::RuntimeFilterType rf_type; TimezoneInfo timezone_info; // thread safe diff --git a/dbms/src/Debug/MockStorage.cpp b/dbms/src/Debug/MockStorage.cpp index a00aedefcf4..9db555e20ff 100644 --- a/dbms/src/Debug/MockStorage.cpp +++ b/dbms/src/Debug/MockStorage.cpp @@ -200,10 +200,11 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge( if (filter_conditions && filter_conditions->hasValue()) { auto analyzer = std::make_unique(names_and_types_map_for_delta_merge[table_id], context); + auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]); query_info.dag_query = std::make_unique( filter_conditions->conditions, empty_pushed_down_filters, // Not care now - mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]), + scan_column_infos, runtime_filter_ids, rf_max_wait_time_ms, context.getTimezoneInfo()); @@ -227,10 +228,11 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge( else { static const google::protobuf::RepeatedPtrField empty_filters{}; + auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]); query_info.dag_query = std::make_unique( empty_filters, empty_pushed_down_filters, // Not care now - mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]), + scan_column_infos, runtime_filter_ids, rf_max_wait_time_ms, context.getTimezoneInfo()); @@ -257,10 +259,11 @@ void MockStorage::buildExecFromDeltaMerge( if (filter_conditions && filter_conditions->hasValue()) { auto analyzer = std::make_unique(names_and_types_map_for_delta_merge[table_id], context); + auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]); query_info.dag_query = std::make_unique( filter_conditions->conditions, empty_pushed_down_filters, // Not care now - mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]), + scan_column_infos, runtime_filter_ids, rf_max_wait_time_ms, context.getTimezoneInfo()); @@ -289,10 +292,11 @@ void MockStorage::buildExecFromDeltaMerge( else { static const google::protobuf::RepeatedPtrField empty_filters{}; + auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]); query_info.dag_query = std::make_unique( empty_filters, empty_pushed_down_filters, // Not care now - mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]), + scan_column_infos, runtime_filter_ids, rf_max_wait_time_ms, context.getTimezoneInfo()); @@ -576,6 +580,11 @@ TableInfo MockStorage::getTableInfoForDeltaMerge(const String & name) return table_infos_for_delta_merge[name]; } +DM::ColumnDefines MockStorage::getStoreColumnDefines(Int64 table_id) +{ + return storage_delta_merge_map[table_id]->getStoreColumnDefines(); +} + ColumnInfos mockColumnInfosToTiDBColumnInfos(const MockColumnInfoVec & mock_column_infos) { ColumnID col_id = 0; diff --git a/dbms/src/Debug/MockStorage.h b/dbms/src/Debug/MockStorage.h index 090cd36f37f..8cda78df16b 100644 --- a/dbms/src/Debug/MockStorage.h +++ b/dbms/src/Debug/MockStorage.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -146,6 +147,7 @@ class MockStorage TableInfo getTableInfo(const String & name); TableInfo getTableInfoForDeltaMerge(const String & name); + DM::ColumnDefines getStoreColumnDefines(Int64 table_id); size_t getTableScanConcurrencyHint(const TiDBTableScan & table_scan); diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/RFWaitTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/RFWaitTask.h index 78885b84abb..ee72fc65205 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/RFWaitTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/RFWaitTask.h @@ -68,7 +68,7 @@ class RFWaitTask : public Task { for (const RuntimeFilterPtr & rf : ready_rf_list) { - auto rs_operator = rf->parseToRSOperator(task_pool->getColumnToRead()); + auto rs_operator = rf->parseToRSOperator(); task_pool->appendRSOperator(rs_operator); } DM::SegmentReadTaskScheduler::instance().add(task_pool); diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp index 8554e434818..c10d983e81b 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp @@ -159,8 +159,7 @@ void PhysicalMockTableScan::buildPipelineExecGroupImpl( { if (auto * source_op = dynamic_cast(group_builder.getCurBuilder(i).source_op.get())) { - auto runtime_filter_list - = context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids); + auto runtime_filter_list = getRuntimeFilterList(context); // todo config max wait time source_op->setRuntimeFilterInfo(runtime_filter_list, rf_max_wait_time_ms); } @@ -232,11 +231,23 @@ void PhysicalMockTableScan::buildRuntimeFilterInLocalStream(Context & context) { if (auto * p_stream = dynamic_cast(local_stream.get())) { - auto runtime_filter_list - = context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids); + auto runtime_filter_list = getRuntimeFilterList(context); // todo config max wait time p_stream->setRuntimeFilterInfo(runtime_filter_list, rf_max_wait_time_ms); } } } + +RuntimeFilteList PhysicalMockTableScan::getRuntimeFilterList(Context & context) +{ + auto mock_column_infos = context.mockStorage()->getTableSchemaForDeltaMerge(table_id); + auto column_infos = mockColumnInfosToTiDBColumnInfos(mock_column_infos); + auto column_defines = context.mockStorage()->getStoreColumnDefines(table_id); + auto rfs = context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids); + for (auto & rf : rfs) + { + rf->setTargetAttr(column_infos, column_defines); + } + return rfs; +} } // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h index 778b77e5932..4e305314d3f 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -70,6 +71,8 @@ class PhysicalMockTableScan : public PhysicalLeaf void buildRuntimeFilterInLocalStream(Context & context); + RuntimeFilteList getRuntimeFilterList(Context & context); + private: FilterConditions filter_conditions; Block sample_block; diff --git a/dbms/src/Storages/DeltaMerge/ColumnDefine_fwd.h b/dbms/src/Storages/DeltaMerge/ColumnDefine_fwd.h index bbb13d01c0e..1bfd2d5aa1f 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnDefine_fwd.h +++ b/dbms/src/Storages/DeltaMerge/ColumnDefine_fwd.h @@ -20,10 +20,17 @@ #include #include +namespace TiDB +{ +struct ColumnInfo; +} + namespace DB::DM { struct ColumnDefine; using ColumnDefines = std::vector; using ColumnDefinesPtr = std::shared_ptr; using ColumnDefineMap = std::unordered_map; + +using ColumnInfos = std::vector; } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp index 4f0c6ceb787..961f1846ab9 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp +++ b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp @@ -99,10 +99,14 @@ PushDownFilterPtr PushDownFilter::build( has_cast) { NamesWithAliases project_cols; - for (size_t i = 0; i < columns_to_read.size(); ++i) + for (size_t i = 0; i < table_scan_column_info.size(); ++i) { - if (filter_col_id_set.contains(columns_to_read[i].id)) - project_cols.emplace_back(casted_columns[i], columns_to_read[i].name); + if (filter_col_id_set.contains(table_scan_column_info[i].id)) + { + auto it = columns_to_read_map.find(table_scan_column_info[i].id); + RUNTIME_CHECK(it != columns_to_read_map.end(), table_scan_column_info[i].id); + project_cols.emplace_back(casted_columns[i], it->second.name); + } } actions->add(ExpressionAction::project(project_cols)); @@ -162,15 +166,15 @@ PushDownFilterPtr PushDownFilter::build( if (unlikely(dag_query == nullptr)) return EMPTY_FILTER; + const auto & columns_to_read_info = dag_query->source_columns; // build rough set operator const auto rs_operator = RSOperator::build( dag_query, - columns_to_read, + columns_to_read_info, table_column_defines, context.getSettingsRef().dt_enable_rough_set_filter, tracing_logger); // build push down filter - const auto & columns_to_read_info = dag_query->source_columns; const auto & pushed_down_filters = dag_query->pushed_down_filters; if (unlikely(context.getSettingsRef().force_push_down_all_filters_to_scan) && !dag_query->filters.empty()) { diff --git a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp index cc24a5948b9..f20984704ba 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp +++ b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp @@ -51,7 +51,7 @@ RSOperatorPtr createUnsupported(const String & reason) RSOperatorPtr RSOperator::build( const std::unique_ptr & dag_query, - const ColumnDefines & columns_to_read, + const ColumnInfos & scan_column_infos, const ColumnDefines & table_column_defines, bool enable_rs_filter, const LoggerPtr & tracing_logger) @@ -75,8 +75,11 @@ RSOperatorPtr RSOperator::build( // Maybe throw an exception? Or check if `type` is nullptr before creating filter? return Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}}; }; - DM::RSOperatorPtr rs_operator - = FilterParser::parseDAGQuery(*dag_query, columns_to_read, std::move(create_attr_by_column_id), tracing_logger); + auto rs_operator = FilterParser::parseDAGQuery( + *dag_query, + scan_column_infos, + std::move(create_attr_by_column_id), + tracing_logger); if (likely(rs_operator != DM::EMPTY_RS_OPERATOR)) LOG_DEBUG(tracing_logger, "Rough set filter: {}", rs_operator->toDebugString()); diff --git a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h index 40dcba49ce6..676fadf475e 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h +++ b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h @@ -56,7 +56,7 @@ class RSOperator static RSOperatorPtr build( const std::unique_ptr & dag_query, - const ColumnDefines & columns_to_read, + const ColumnInfos & scan_column_infos, const ColumnDefines & table_column_defines, bool enable_rs_filter, const LoggerPtr & tracing_logger); diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp index 8541b5dc5b0..28a86e9e6c5 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp @@ -79,19 +79,19 @@ inline bool isRoughSetFilterSupportType(const Int32 field_type) return false; } -ColumnDefine getColumnDefineForColumnExpr(const tipb::Expr & expr, const ColumnDefines & columns_to_read) +ColumnID getColumnIDForColumnExpr(const tipb::Expr & expr, const ColumnInfos & scan_column_infos) { assert(isColumnExpr(expr)); auto column_index = decodeDAGInt64(expr.val()); - if (column_index < 0 || column_index >= static_cast(columns_to_read.size())) + if (column_index < 0 || column_index >= static_cast(scan_column_infos.size())) { throw TiFlashException( Errors::Coprocessor::BadRequest, "Column index out of bound: {}, should in [0,{})", column_index, - columns_to_read.size()); + scan_column_infos.size()); } - return columns_to_read[column_index]; + return scan_column_infos[column_index].id; } // convert literal value from timezone specified in cop request to UTC in-place @@ -110,7 +110,7 @@ inline void convertFieldWithTimezone(Field & value, const TimezoneInfo & timezon inline RSOperatorPtr parseTiCompareExpr( // const tipb::Expr & expr, const FilterParser::RSFilterType filter_type, - const ColumnDefines & columns_to_read, + const ColumnInfos & scan_column_infos, const FilterParser::AttrCreatorByColumnID & creator, const TimezoneInfo & timezone_info) { @@ -163,8 +163,8 @@ inline RSOperatorPtr parseTiCompareExpr( // tipb::ScalarFuncSig_Name(expr.sig()), field_type)); - const auto col = getColumnDefineForColumnExpr(child, columns_to_read); - attr = creator(col.id); + auto col_id = getColumnIDForColumnExpr(child, scan_column_infos); + attr = creator(col_id); } else if (isLiteralExpr(child)) { @@ -243,7 +243,7 @@ inline RSOperatorPtr parseTiCompareExpr( // RSOperatorPtr parseTiExpr( const tipb::Expr & expr, - const ColumnDefines & columns_to_read, + const ColumnInfos & scan_column_infos, const FilterParser::AttrCreatorByColumnID & creator, const TimezoneInfo & timezone_info, const LoggerPtr & log) @@ -273,7 +273,7 @@ RSOperatorPtr parseTiExpr( fmt::format("logical not with {} children is not supported", expr.children_size())); if (const auto & child = expr.children(0); likely(isFunctionExpr(child))) - return createNot(parseTiExpr(child, columns_to_read, creator, timezone_info, log)); + return createNot(parseTiExpr(child, scan_column_infos, creator, timezone_info, log)); else return createUnsupported(fmt::format( "child of logical not is not function, child_type={}", @@ -288,7 +288,7 @@ RSOperatorPtr parseTiExpr( for (const auto & child : expr.children()) { if (likely(isFunctionExpr(child))) - children.emplace_back(parseTiExpr(child, columns_to_read, creator, timezone_info, log)); + children.emplace_back(parseTiExpr(child, scan_column_infos, creator, timezone_info, log)); else children.emplace_back(createUnsupported(fmt::format( "child of logical operator is not function, child_type={}", @@ -307,7 +307,7 @@ RSOperatorPtr parseTiExpr( case FilterParser::RSFilterType::Less: case FilterParser::RSFilterType::LessEqual: case FilterParser::RSFilterType::In: - return parseTiCompareExpr(expr, filter_type, columns_to_read, creator, timezone_info); + return parseTiCompareExpr(expr, filter_type, scan_column_infos, creator, timezone_info); case FilterParser::RSFilterType::IsNull: { @@ -330,8 +330,8 @@ RSOperatorPtr parseTiExpr( auto field_type = child.field_type().tp(); if (isRoughSetFilterSupportType(field_type)) { - const auto col = getColumnDefineForColumnExpr(child, columns_to_read); - Attr attr = creator(col.id); + auto col_id = getColumnIDForColumnExpr(child, scan_column_infos); + auto attr = creator(col_id); return createIsNull(attr); } return createUnsupported( @@ -358,7 +358,7 @@ RSOperatorPtr parseTiExpr( RSOperatorPtr FilterParser::parseDAGQuery( const DAGQueryInfo & dag_info, - const ColumnDefines & columns_to_read, + const ColumnInfos & scan_column_infos, FilterParser::AttrCreatorByColumnID && creator, const LoggerPtr & log) { @@ -367,11 +367,11 @@ RSOperatorPtr FilterParser::parseDAGQuery( children.reserve(dag_info.filters.size() + dag_info.pushed_down_filters.size()); for (const auto & filter : dag_info.filters) { - children.emplace_back(cop::parseTiExpr(filter, columns_to_read, creator, dag_info.timezone_info, log)); + children.emplace_back(cop::parseTiExpr(filter, scan_column_infos, creator, dag_info.timezone_info, log)); } for (const auto & filter : dag_info.pushed_down_filters) { - children.emplace_back(cop::parseTiExpr(filter, columns_to_read, creator, dag_info.timezone_info, log)); + children.emplace_back(cop::parseTiExpr(filter, scan_column_infos, creator, dag_info.timezone_info, log)); } if (children.empty()) @@ -385,7 +385,7 @@ RSOperatorPtr FilterParser::parseDAGQuery( RSOperatorPtr FilterParser::parseRFInExpr( const tipb::RuntimeFilterType rf_type, const tipb::Expr & target_expr, - const ColumnDefines & columns_to_read, + const std::optional & target_attr, const std::set & setElements, const TimezoneInfo & timezone_info) { @@ -393,11 +393,10 @@ RSOperatorPtr FilterParser::parseRFInExpr( { case tipb::IN: { - if (!isColumnExpr(target_expr)) + if (!isColumnExpr(target_expr) || !target_attr) return createUnsupported( fmt::format("rf target expr is not column expr, expr.tp={}", tipb::ExprType_Name(target_expr.tp()))); - auto column_define = cop::getColumnDefineForColumnExpr(target_expr, columns_to_read); - auto attr = Attr{.col_name = column_define.name, .col_id = column_define.id, .type = column_define.type}; + const auto & attr = *target_attr; if (target_expr.field_type().tp() == TiDB::TypeTimestamp && !timezone_info.is_utc_timezone) { Fields values; @@ -421,6 +420,27 @@ RSOperatorPtr FilterParser::parseRFInExpr( } } +std::optional FilterParser::createAttr( + const tipb::Expr & expr, + const ColumnInfos & scan_column_infos, + const ColumnDefines & table_column_defines) +{ + if (!isColumnExpr(expr)) + { + return std::nullopt; + } + auto col_id = cop::getColumnIDForColumnExpr(expr, scan_column_infos); + auto it = std::find_if( // + table_column_defines.cbegin(), + table_column_defines.cend(), + [col_id](const ColumnDefine & cd) { return cd.id == col_id; }); + if (it != table_column_defines.cend()) + { + return Attr{.col_name = it->name, .col_id = it->id, .type = it->type}; + } + return std::nullopt; +} + bool FilterParser::isRSFilterSupportType(const Int32 field_type) { return cop::isRoughSetFilterSupportType(field_type); diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h index 1a504b48abc..dd959c74bf0 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h @@ -41,7 +41,7 @@ class FilterParser using AttrCreatorByColumnID = std::function; static RSOperatorPtr parseDAGQuery( const DAGQueryInfo & dag_info, - const ColumnDefines & columns_to_read, + const ColumnInfos & scan_column_infos, AttrCreatorByColumnID && creator, const LoggerPtr & log); @@ -49,10 +49,15 @@ class FilterParser static RSOperatorPtr parseRFInExpr( tipb::RuntimeFilterType rf_type, const tipb::Expr & target_expr, - const ColumnDefines & columns_to_read, + const std::optional & target_attr, const std::set & setElements, const TimezoneInfo & timezone_info); + static std::optional createAttr( + const tipb::Expr & expr, + const ColumnInfos & scan_column_infos, + const ColumnDefines & table_column_defines); + static bool isRSFilterSupportType(Int32 field_type); /// Some helper structure diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.cpp index 77ce2e50062..f53cef162fa 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.cpp @@ -43,7 +43,7 @@ void UnorderedInputStream::pushDownReadyRFList(std::vector rea { for (const RuntimeFilterPtr & rf : readyRFList) { - auto rs_operator = rf->parseToRSOperator(task_pool->getColumnToRead()); + auto rs_operator = rf->parseToRSOperator(); task_pool->appendRSOperator(rs_operator); } } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp index 6c39ebf6737..324064a3c2a 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp @@ -2235,10 +2235,15 @@ try const ColumnDefines columns_to_read = {ColumnDefine{1, "a", std::make_shared()}, ColumnDefine{2, "b", std::make_shared()}}; + // Only need id of ColumnInfo + TiDB::ColumnInfo a, b; + a.id = 1; + b.id = 2; + ColumnInfos column_infos = {a, b}; auto dag_query = std::make_unique( filters, pushed_down_filters, // Not care now - std::vector{}, // Not care now + column_infos, std::vector{}, 0, context->getTimezoneInfo()); @@ -2252,7 +2257,7 @@ try return Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}}; }; const auto op - = DB::DM::FilterParser::parseDAGQuery(*dag_query, columns_to_read, create_attr_by_column_id, Logger::get()); + = DB::DM::FilterParser::parseDAGQuery(*dag_query, column_infos, create_attr_by_column_id, Logger::get()); EXPECT_EQ( op->toDebugString(), R"raw({"op":"and","children":[{"op":"in","col":"b","value":"["1","2"]},{"op":"unsupported","reason":"Multiple ColumnRef in expression is not supported, sig=InInt"},{"op":"unsupported","reason":"Multiple ColumnRef in expression is not supported, sig=InInt"}]})raw"); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 387e1072e35..5ae9c05a153 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -733,6 +733,7 @@ DM::RowKeyRanges parseMvccQueryInfo( RuntimeFilteList parseRuntimeFilterList( const SelectQueryInfo & query_info, + const DM::ColumnDefines & table_column_defines, const Context & db_context, const LoggerPtr & log) { @@ -743,6 +744,10 @@ RuntimeFilteList parseRuntimeFilterList( auto runtime_filter_list = db_context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds( query_info.dag_query->runtime_filter_ids); LOG_DEBUG(log, "build runtime filter in local stream, list size:{}", runtime_filter_list.size()); + for (auto & rf : runtime_filter_list) + { + rf->setTargetAttr(query_info.dag_query->source_columns, table_column_defines); + } return runtime_filter_list; } } // namespace @@ -797,7 +802,7 @@ BlockInputStreams StorageDeltaMerge::read( auto filter = PushDownFilter::build(query_info, columns_to_read, store->getTableColumns(), context, tracing_logger); - auto runtime_filter_list = parseRuntimeFilterList(query_info, context, log); + auto runtime_filter_list = parseRuntimeFilterList(query_info, store->getTableColumns(), context, tracing_logger); const auto & scan_context = mvcc_query_info.scan_context; @@ -880,7 +885,7 @@ void StorageDeltaMerge::read( auto filter = PushDownFilter::build(query_info, columns_to_read, store->getTableColumns(), context, tracing_logger); - auto runtime_filter_list = parseRuntimeFilterList(query_info, context, log); + auto runtime_filter_list = parseRuntimeFilterList(query_info, store->getTableColumns(), context, tracing_logger); const auto & scan_context = mvcc_query_info.scan_context; diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 4f3a3651117..fff1b32f413 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -40,6 +40,8 @@ struct CheckpointInfo; using CheckpointInfoPtr = std::shared_ptr; struct CheckpointIngestInfo; using CheckpointIngestInfoPtr = std::shared_ptr; +class MockStorage; + namespace DM { struct RowKeyRange; @@ -301,6 +303,8 @@ class StorageDeltaMerge Context & global_context; LoggerPtr log; + + friend class MockStorage; }; diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index 1ae0bc889e2..78fa9cd9f9b 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -502,7 +502,7 @@ DM::RSOperatorPtr StorageDisaggregated::buildRSOperator( 0, db_context.getTimezoneInfo()); - return DM::RSOperator::build(dag_query, *columns_to_read, *columns_to_read, enable_rs_filter, log); + return DM::RSOperator::build(dag_query, table_scan.getColumns(), *columns_to_read, enable_rs_filter, log); } std::variant StorageDisaggregated::packSegmentReadTasks( diff --git a/dbms/src/Storages/tests/gtest_filter_parser.cpp b/dbms/src/Storages/tests/gtest_filter_parser.cpp index c6de544299b..c75c988368a 100644 --- a/dbms/src/Storages/tests/gtest_filter_parser.cpp +++ b/dbms/src/Storages/tests/gtest_filter_parser.cpp @@ -122,7 +122,7 @@ DM::RSOperatorPtr FilterParserTest::generateRsOperator( return DM::Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}}; }; - return DM::FilterParser::parseDAGQuery(*dag_query, columns_to_read, std::move(create_attr_by_column_id), log); + return DM::FilterParser::parseDAGQuery(*dag_query, table_info.columns, std::move(create_attr_by_column_id), log); } // Test cases for col and literal diff --git a/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp b/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp index cfab21463f1..4e8c836d4d5 100644 --- a/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp +++ b/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp @@ -117,7 +117,7 @@ DM::PushDownFilterPtr ParsePushDownFilterTest::generatePushDownFilter( }; auto rs_operator - = DM::FilterParser::parseDAGQuery(*dag_query, columns_to_read, std::move(create_attr_by_column_id), log); + = DM::FilterParser::parseDAGQuery(*dag_query, table_info.columns, std::move(create_attr_by_column_id), log); auto push_down_filter = DM::PushDownFilter::build(rs_operator, table_info.columns, pushed_down_filters, columns_to_read, *ctx, log); return push_down_filter; diff --git a/dbms/src/TiDB/Schema/TiDB.h b/dbms/src/TiDB/Schema/TiDB.h index 0d3689d385b..156625494fe 100644 --- a/dbms/src/TiDB/Schema/TiDB.h +++ b/dbms/src/TiDB/Schema/TiDB.h @@ -203,19 +203,10 @@ struct ColumnInfo #ifdef M #error "Please undefine macro M first." #endif -#define M(f, v) \ - inline bool has##f##Flag() const \ - { \ - return (flag & (v)) != 0; \ - } \ - inline void set##f##Flag() \ - { \ - flag |= (v); \ - } \ - inline void clear##f##Flag() \ - { \ - flag &= (~(v)); \ - } +#define M(f, v) \ + inline bool has##f##Flag() const { return (flag & (v)) != 0; } \ + inline void set##f##Flag() { flag |= (v); } \ + inline void clear##f##Flag() { flag &= (~(v)); } COLUMN_FLAGS(M) #undef M @@ -433,3 +424,13 @@ std::vector toTiDBColumnInfos( const ::google::protobuf::RepeatedPtrField & tipb_column_infos); } // namespace TiDB + +template <> +struct fmt::formatter +{ + template + auto format(const TiDB::ColumnInfo & ci, FormatContext & ctx) const -> decltype(ctx.out()) + { + return fmt::format_to(ctx.out(), "{}", ci.id); + } +}; diff --git a/tests/fullstack-test/expr/generated_columns.test b/tests/fullstack-test/expr/generated_columns.test new file mode 100644 index 00000000000..1e3a15632bf --- /dev/null +++ b/tests/fullstack-test/expr/generated_columns.test @@ -0,0 +1,98 @@ +# Copyright 2024 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t; +mysql> create table if not exists test.t(a int); +mysql> alter table test.t add column b int as (a+1) virtual; +mysql> alter table test.t add column c int; +mysql> alter table test.t add column d int as (c+1) virtual; +mysql> alter table test.t add column e int; + +mysql> insert into test.t(a, c, e) values(1, 10, 100), (2, 20, 200), (3, 30, 300), (4, 40, 400), (5, 50, 500), (6, 60, 600), (7, 70, 700), (8, 80, 800), (9, 90, 900); + +mysql> alter table test.t set tiflash replica 1; + +func> wait_table test t + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 10; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 1 | 2 | 10 | 11 | 100 | ++------+------+------+------+------+ + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 20; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 2 | 3 | 20 | 21 | 200 | ++------+------+------+------+------+ + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 30; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 3 | 4 | 30 | 31 | 300 | ++------+------+------+------+------+ + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 40; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 4 | 5 | 40 | 41 | 400 | ++------+------+------+------+------+ + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 50; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 5 | 6 | 50 | 51 | 500 | ++------+------+------+------+------+ + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 60; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 6 | 7 | 60 | 61 | 600 | ++------+------+------+------+------+ + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 70; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 7 | 8 | 70 | 71 | 700 | ++------+------+------+------+------+ + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 80; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 8 | 9 | 80 | 81 | 800 | ++------+------+------+------+------+ + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 80; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 8 | 9 | 80 | 81 | 800 | ++------+------+------+------+------+ + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 90; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 9 | 10 | 90 | 91 | 900 | ++------+------+------+------+------+ + +mysql> drop table test.t; diff --git a/tests/fullstack-test/expr/generated_columns2.test b/tests/fullstack-test/expr/generated_columns2.test new file mode 100644 index 00000000000..ded5572b6ab --- /dev/null +++ b/tests/fullstack-test/expr/generated_columns2.test @@ -0,0 +1,49 @@ +# Copyright 2024 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t; +mysql> create table if not exists test.t(a int); +mysql> alter table test.t add column b int as (a+1) virtual; +mysql> alter table test.t add column c int; +mysql> alter table test.t add column d int as (c+1) virtual; +mysql> alter table test.t add column t time(6); + +mysql> insert into test.t(a, c, t) values(1, 2, '000:10:10.123456'), (3, 4, '001:10:10.123456'), (5, 6, '002:10:10.123456'); +mysql> insert into test.t(a, c, t) select a, c, t + 0.001 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 0.002 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 0.004 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 0.008 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 0.016 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 0.032 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 0.064 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 0.128 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 0.256 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 0.512 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 1.024 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 2.048 from test.t; + +mysql> alter table test.t set tiflash replica 1; + +func> wait_table test t + +mysql> analyze table test.t; + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, hour(t) from test.t where t = '000:10:10.123456'; ++------+------+------+------+---------+ +| a | b | c | d | hour(t) | ++------+------+------+------+---------+ +| 1 | 2 | 2 | 3 | 0 | ++------+------+------+------+---------+ + +mysql> drop table test.t; diff --git a/tests/fullstack-test/expr/runtime_filter.test b/tests/fullstack-test/expr/runtime_filter.test new file mode 100644 index 00000000000..cfbe663c6b6 --- /dev/null +++ b/tests/fullstack-test/expr/runtime_filter.test @@ -0,0 +1,46 @@ +# Copyright 2024 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t; +mysql> create table if not exists test.t(a int); +mysql> alter table test.t add column b int as (a+1) virtual; +mysql> alter table test.t add column c int; +mysql> alter table test.t add column d int as (c+1) virtual; +mysql> alter table test.t add column e int; +mysql> insert into test.t(a, c, e) values(1, 10, 100), (2, 20, 200), (3, 30, 300), (4, 40, 400), (5, 50, 500), (6, 60, 600), (7, 70, 700), (8, 80, 800), (9, 90, 900); +mysql> alter table test.t set tiflash replica 1; + +mysql> drop table if exists test.t2; +mysql> create table if not exists test.t2(f int); +mysql> insert into test.t2 values(10); +mysql> alter table test.t2 set tiflash replica 1; + +func> wait_table test t +func> wait_table test t2 + +mysql> set tidb_runtime_filter_mode="LOCAL"; set tidb_isolation_read_engines='tiflash'; select /*+ HASH_JOIN_BUILD(test.t2) */ a, c, e from test.t t, test.t2 t2 where t.c = t2.f; ++------+------+------+ +| a | c | e | ++------+------+------+ +| 1 | 10 | 100 | ++------+------+------+ + +mysql> set tidb_runtime_filter_mode="LOCAL"; set tidb_isolation_read_engines='tiflash'; select /*+ HASH_JOIN_BUILD(test.t2) */ a, b, c, d, e from test.t t, test.t2 t2 where t.c = t2.f; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 1 | 2 | 10 | 11 | 100 | ++------+------+------+------+------+ + +mysql> drop table test.t;