diff --git a/dbms/src/DataStreams/RuntimeFilter.cpp b/dbms/src/DataStreams/RuntimeFilter.cpp index 62ed46e5de5..a76a8feb9d3 100644 --- a/dbms/src/DataStreams/RuntimeFilter.cpp +++ b/dbms/src/DataStreams/RuntimeFilter.cpp @@ -206,7 +206,14 @@ bool RuntimeFilter::updateStatus(RuntimeFilterStatus status_, const std::string return true; } -DM::RSOperatorPtr RuntimeFilter::parseToRSOperator(DM::ColumnDefines & columns_to_read) +void RuntimeFilter::setTargetAttr( + const DM::ColumnInfos & scan_column_infos, + const DM::ColumnDefines & table_column_defines) +{ + target_attr = DM::FilterParser::createAttr(target_expr, scan_column_infos, table_column_defines); +} + +DM::RSOperatorPtr RuntimeFilter::parseToRSOperator() { switch (rf_type) { @@ -216,7 +223,7 @@ DM::RSOperatorPtr RuntimeFilter::parseToRSOperator(DM::ColumnDefines & columns_t return DM::FilterParser::parseRFInExpr( rf_type, target_expr, - columns_to_read, + target_attr, in_values_set->getUniqueSetElements(), timezone_info); case tipb::MIN_MAX: diff --git a/dbms/src/DataStreams/RuntimeFilter.h b/dbms/src/DataStreams/RuntimeFilter.h index 8a8ab3a8c96..1f4ffebef45 100644 --- a/dbms/src/DataStreams/RuntimeFilter.h +++ b/dbms/src/DataStreams/RuntimeFilter.h @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -47,7 +48,7 @@ class RuntimeFilter } source_expr = rf_pb.source_expr_list().Get(0); target_expr = rf_pb.target_expr_list().Get(0); - }; + } std::string getSourceColumnName() const; @@ -77,7 +78,8 @@ class RuntimeFilter bool await(int64_t ms_remaining); - DM::RSOperatorPtr parseToRSOperator(DM::ColumnDefines & columns_to_read); + void setTargetAttr(const DM::ColumnInfos & scan_column_infos, const DM::ColumnDefines & table_column_defines); + DM::RSOperatorPtr parseToRSOperator(); const int id; @@ -86,6 +88,7 @@ class RuntimeFilter tipb::Expr source_expr; tipb::Expr target_expr; + std::optional target_attr; const tipb::RuntimeFilterType rf_type; TimezoneInfo timezone_info; // thread safe diff --git a/dbms/src/Debug/MockStorage.cpp b/dbms/src/Debug/MockStorage.cpp index 28373bd3070..afb46dea2c4 100644 --- a/dbms/src/Debug/MockStorage.cpp +++ b/dbms/src/Debug/MockStorage.cpp @@ -199,10 +199,11 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge( if (filter_conditions && filter_conditions->hasValue()) { auto analyzer = std::make_unique(names_and_types_map_for_delta_merge[table_id], context); + auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]); query_info.dag_query = std::make_unique( filter_conditions->conditions, empty_pushed_down_filters, // Not care now - mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]), + scan_column_infos, runtime_filter_ids, rf_max_wait_time_ms, context.getTimezoneInfo()); @@ -226,10 +227,11 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge( else { static const google::protobuf::RepeatedPtrField empty_filters{}; + auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]); query_info.dag_query = std::make_unique( empty_filters, empty_pushed_down_filters, // Not care now - mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]), + scan_column_infos, runtime_filter_ids, rf_max_wait_time_ms, context.getTimezoneInfo()); @@ -256,10 +258,11 @@ void MockStorage::buildExecFromDeltaMerge( if (filter_conditions && filter_conditions->hasValue()) { auto analyzer = std::make_unique(names_and_types_map_for_delta_merge[table_id], context); + auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]); query_info.dag_query = std::make_unique( filter_conditions->conditions, empty_pushed_down_filters, // Not care now - mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]), + scan_column_infos, runtime_filter_ids, rf_max_wait_time_ms, context.getTimezoneInfo()); @@ -288,10 +291,11 @@ void MockStorage::buildExecFromDeltaMerge( else { static const google::protobuf::RepeatedPtrField empty_filters{}; + auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]); query_info.dag_query = std::make_unique( empty_filters, empty_pushed_down_filters, // Not care now - mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]), + scan_column_infos, runtime_filter_ids, rf_max_wait_time_ms, context.getTimezoneInfo()); @@ -575,6 +579,11 @@ TableInfo MockStorage::getTableInfoForDeltaMerge(const String & name) return table_infos_for_delta_merge[name]; } +DM::ColumnDefines MockStorage::getStoreColumnDefines(Int64 table_id) +{ + return storage_delta_merge_map[table_id]->getStoreColumnDefines(); +} + ColumnInfos mockColumnInfosToTiDBColumnInfos(const MockColumnInfoVec & mock_column_infos) { ColumnID col_id = 0; diff --git a/dbms/src/Debug/MockStorage.h b/dbms/src/Debug/MockStorage.h index 8561a181ef3..8db3751fef6 100644 --- a/dbms/src/Debug/MockStorage.h +++ b/dbms/src/Debug/MockStorage.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -145,6 +146,7 @@ class MockStorage TableInfo getTableInfo(const String & name); TableInfo getTableInfoForDeltaMerge(const String & name); + DM::ColumnDefines getStoreColumnDefines(Int64 table_id); size_t getTableScanConcurrencyHint(const TiDBTableScan & table_scan); diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h index 245d45f8e30..95aee02b27b 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h @@ -68,7 +68,7 @@ class RFWaitTask : public Task { for (const RuntimeFilterPtr & rf : ready_rf_list) { - auto rs_operator = rf->parseToRSOperator(task_pool->getColumnToRead()); + auto rs_operator = rf->parseToRSOperator(); task_pool->appendRSOperator(rs_operator); } DM::SegmentReadTaskScheduler::instance().add(task_pool); diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp index f888c2a7624..855a6b0e9fc 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp @@ -159,8 +159,7 @@ void PhysicalMockTableScan::buildPipelineExecGroupImpl( { if (auto * source_op = dynamic_cast(group_builder.getCurBuilder(i).source_op.get())) { - auto runtime_filter_list - = context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids); + auto runtime_filter_list = getRuntimeFilterList(context); // todo config max wait time source_op->setRuntimeFilterInfo(runtime_filter_list, rf_max_wait_time_ms); } @@ -232,11 +231,23 @@ void PhysicalMockTableScan::buildRuntimeFilterInLocalStream(Context & context) { if (auto * p_stream = dynamic_cast(local_stream.get())) { - auto runtime_filter_list - = context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids); + auto runtime_filter_list = getRuntimeFilterList(context); // todo config max wait time p_stream->setRuntimeFilterInfo(runtime_filter_list, rf_max_wait_time_ms); } } } + +RuntimeFilteList PhysicalMockTableScan::getRuntimeFilterList(Context & context) +{ + auto mock_column_infos = context.mockStorage()->getTableSchemaForDeltaMerge(table_id); + auto column_infos = mockColumnInfosToTiDBColumnInfos(mock_column_infos); + auto column_defines = context.mockStorage()->getStoreColumnDefines(table_id); + auto rfs = context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids); + for (auto & rf : rfs) + { + rf->setTargetAttr(column_infos, column_defines); + } + return rfs; +} } // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h index 24dea7612f7..7c10a7ffb74 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -70,6 +71,8 @@ class PhysicalMockTableScan : public PhysicalLeaf void buildRuntimeFilterInLocalStream(Context & context); + RuntimeFilteList getRuntimeFilterList(Context & context); + private: FilterConditions filter_conditions; Block sample_block; diff --git a/dbms/src/Storages/DeltaMerge/ColumnDefine_fwd.h b/dbms/src/Storages/DeltaMerge/ColumnDefine_fwd.h new file mode 100644 index 00000000000..961c9280a48 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ColumnDefine_fwd.h @@ -0,0 +1,36 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include +#include +#include + +namespace TiDB +{ +struct ColumnInfo; +} + +namespace DB::DM +{ +struct ColumnDefine; +using ColumnDefines = std::vector; +using ColumnDefinesPtr = std::shared_ptr; +using ColumnDefineMap = std::unordered_map; + +using ColumnInfos = std::vector; +} // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp index a7d7c9ea764..89ed23cad12 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp @@ -78,32 +78,19 @@ inline bool isRoughSetFilterSupportType(const Int32 field_type) return false; } -ColumnID getColumnIDForColumnExpr(const tipb::Expr & expr, const ColumnDefines & columns_to_read) +ColumnID getColumnIDForColumnExpr(const tipb::Expr & expr, const ColumnInfos & scan_column_infos) { assert(isColumnExpr(expr)); auto column_index = decodeDAGInt64(expr.val()); - if (column_index < 0 || column_index >= static_cast(columns_to_read.size())) + if (column_index < 0 || column_index >= static_cast(scan_column_infos.size())) { throw TiFlashException( - "Column index out of bound: " + DB::toString(column_index) + ", should in [0," - + DB::toString(columns_to_read.size()) + ")", - Errors::Coprocessor::BadRequest); + Errors::Coprocessor::BadRequest, + "Column index out of bound: {}, should in [0,{})", + column_index, + scan_column_infos.size()); } - return columns_to_read[column_index].id; -} - -ColumnDefine getColumnDefineForColumnExpr(const tipb::Expr & expr, const ColumnDefines & columns_to_read) -{ - assert(isColumnExpr(expr)); - auto column_index = decodeDAGInt64(expr.val()); - if (column_index < 0 || column_index >= static_cast(columns_to_read.size())) - { - throw TiFlashException( - "Column index out of bound: " + DB::toString(column_index) + ", should in [0," - + DB::toString(columns_to_read.size()) + ")", - Errors::Coprocessor::BadRequest); - } - return columns_to_read[column_index]; + return scan_column_infos[column_index].id; } // convert literal value from timezone specified in cop request to UTC in-place @@ -129,7 +116,7 @@ enum class OperandType inline RSOperatorPtr parseTiCompareExpr( // const tipb::Expr & expr, const FilterParser::RSFilterType filter_type, - const ColumnDefines & columns_to_read, + const ColumnInfos & scan_column_infos, const FilterParser::AttrCreatorByColumnID & creator, const TimezoneInfo & timezone_info, const LoggerPtr & /*log*/) @@ -177,7 +164,7 @@ inline RSOperatorPtr parseTiCompareExpr( // if (expr.children_size() != 2 && child_idx != 0) return createUnsupported(expr.ShortDebugString(), "ColumnRef in In/NotIn is not supported", false); - ColumnID id = getColumnIDForColumnExpr(child, columns_to_read); + ColumnID id = getColumnIDForColumnExpr(child, scan_column_infos); attr = creator(id); if (child_idx == 0) left = OperandType::Column; @@ -293,7 +280,7 @@ inline RSOperatorPtr parseTiCompareExpr( // RSOperatorPtr parseTiExpr( const tipb::Expr & expr, - const ColumnDefines & columns_to_read, + const ColumnInfos & scan_column_infos, const FilterParser::AttrCreatorByColumnID & creator, const TimezoneInfo & timezone_info, const LoggerPtr & log) @@ -328,7 +315,7 @@ RSOperatorPtr parseTiExpr( { const auto & child = expr.children(0); if (likely(isFunctionExpr(child))) - op = createNot(parseTiExpr(child, columns_to_read, creator, timezone_info, log)); + op = createNot(parseTiExpr(child, scan_column_infos, creator, timezone_info, log)); else op = createUnsupported(child.ShortDebugString(), "child of logical not is not function", false); } @@ -343,7 +330,7 @@ RSOperatorPtr parseTiExpr( { const auto & child = expr.children(i); if (likely(isFunctionExpr(child))) - children.emplace_back(parseTiExpr(child, columns_to_read, creator, timezone_info, log)); + children.emplace_back(parseTiExpr(child, scan_column_infos, creator, timezone_info, log)); else children.emplace_back(createUnsupported( child.ShortDebugString(), @@ -365,7 +352,7 @@ RSOperatorPtr parseTiExpr( case FilterParser::RSFilterType::LessEqual: case FilterParser::RSFilterType::In: case FilterParser::RSFilterType::NotIn: - op = parseTiCompareExpr(expr, filter_type, columns_to_read, creator, timezone_info, log); + op = parseTiCompareExpr(expr, filter_type, scan_column_infos, creator, timezone_info, log); break; case FilterParser::RSFilterType::IsNull: @@ -397,7 +384,7 @@ RSOperatorPtr parseTiExpr( false); else { - ColumnID id = getColumnIDForColumnExpr(child, columns_to_read); + ColumnID id = getColumnIDForColumnExpr(child, scan_column_infos); Attr attr = creator(id); op = createIsNull(attr); } @@ -433,13 +420,13 @@ RSOperatorPtr parseTiExpr( inline RSOperatorPtr tryParse( const tipb::Expr & filter, - const ColumnDefines & columns_to_read, + const ColumnInfos & scan_column_infos, const FilterParser::AttrCreatorByColumnID & creator, const TimezoneInfo & timezone_info, const LoggerPtr & log) { if (isFunctionExpr(filter)) - return cop::parseTiExpr(filter, columns_to_read, creator, timezone_info, log); + return cop::parseTiExpr(filter, scan_column_infos, creator, timezone_info, log); else return createUnsupported(filter.ShortDebugString(), "child of logical and is not function", false); } @@ -449,7 +436,7 @@ inline RSOperatorPtr tryParse( RSOperatorPtr FilterParser::parseDAGQuery( const DAGQueryInfo & dag_info, - const ColumnDefines & columns_to_read, + const ColumnInfos & scan_column_infos, FilterParser::AttrCreatorByColumnID && creator, const LoggerPtr & log) { @@ -459,11 +446,11 @@ RSOperatorPtr FilterParser::parseDAGQuery( if (dag_info.filters.size() == 1 && dag_info.pushed_down_filters.empty()) { - op = cop::tryParse(dag_info.filters[0], columns_to_read, creator, dag_info.timezone_info, log); + op = cop::tryParse(dag_info.filters[0], scan_column_infos, creator, dag_info.timezone_info, log); } else if (dag_info.pushed_down_filters.size() == 1 && dag_info.filters.empty()) { - op = cop::tryParse(dag_info.pushed_down_filters[0], columns_to_read, creator, dag_info.timezone_info, log); + op = cop::tryParse(dag_info.pushed_down_filters[0], scan_column_infos, creator, dag_info.timezone_info, log); } else { @@ -472,11 +459,11 @@ RSOperatorPtr FilterParser::parseDAGQuery( children.reserve(dag_info.filters.size() + dag_info.pushed_down_filters.size()); for (const auto & filter : dag_info.filters) { - children.emplace_back(cop::tryParse(filter, columns_to_read, creator, dag_info.timezone_info, log)); + children.emplace_back(cop::tryParse(filter, scan_column_infos, creator, dag_info.timezone_info, log)); } for (const auto & filter : dag_info.pushed_down_filters) { - children.emplace_back(cop::tryParse(filter, columns_to_read, creator, dag_info.timezone_info, log)); + children.emplace_back(cop::tryParse(filter, scan_column_infos, creator, dag_info.timezone_info, log)); } op = createAnd(children); } @@ -486,7 +473,7 @@ RSOperatorPtr FilterParser::parseDAGQuery( RSOperatorPtr FilterParser::parseRFInExpr( const tipb::RuntimeFilterType rf_type, const tipb::Expr & target_expr, - const ColumnDefines & columns_to_read, + const std::optional & target_attr, const std::set & setElements, const TimezoneInfo & timezone_info) { @@ -497,12 +484,11 @@ RSOperatorPtr FilterParser::parseRFInExpr( { case tipb::IN: { - if (!isColumnExpr(target_expr)) + if (!isColumnExpr(target_expr) || !target_attr) { return createUnsupported(target_expr.ShortDebugString(), "rf target expr is not column expr", false); } - auto column_define = cop::getColumnDefineForColumnExpr(target_expr, columns_to_read); - auto attr = Attr{.col_name = column_define.name, .col_id = column_define.id, .type = column_define.type}; + const auto & attr = *target_attr; if (target_expr.field_type().tp() == TiDB::TypeTimestamp && !timezone_info.is_utc_timezone) { Fields values; @@ -527,6 +513,27 @@ RSOperatorPtr FilterParser::parseRFInExpr( return createUnsupported(target_expr.ShortDebugString(), "function params should be in predicate", false); } +std::optional FilterParser::createAttr( + const tipb::Expr & expr, + const ColumnInfos & scan_column_infos, + const ColumnDefines & table_column_defines) +{ + if (!isColumnExpr(expr)) + { + return std::nullopt; + } + auto col_id = cop::getColumnIDForColumnExpr(expr, scan_column_infos); + auto it = std::find_if( // + table_column_defines.cbegin(), + table_column_defines.cend(), + [col_id](const ColumnDefine & cd) { return cd.id == col_id; }); + if (it != table_column_defines.cend()) + { + return Attr{.col_name = it->name, .col_id = it->id, .type = it->type}; + } + return std::nullopt; +} + bool FilterParser::isRSFilterSupportType(const Int32 field_type) { return cop::isRoughSetFilterSupportType(field_type); diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h index 2c6a730e24a..2687ac208fb 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h @@ -45,7 +45,7 @@ class FilterParser using AttrCreatorByColumnID = std::function; static RSOperatorPtr parseDAGQuery( const DAGQueryInfo & dag_info, - const ColumnDefines & columns_to_read, + const ColumnInfos & scan_column_infos, AttrCreatorByColumnID && creator, const LoggerPtr & log); @@ -53,10 +53,15 @@ class FilterParser static RSOperatorPtr parseRFInExpr( tipb::RuntimeFilterType rf_type, const tipb::Expr & target_expr, - const ColumnDefines & columns_to_read, + const std::optional & target_attr, const std::set & setElements, const TimezoneInfo & timezone_info); + static std::optional createAttr( + const tipb::Expr & expr, + const ColumnInfos & scan_column_infos, + const ColumnDefines & table_column_defines); + static bool isRSFilterSupportType(Int32 field_type); /// Some helper structure diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.cpp index 77ce2e50062..f53cef162fa 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.cpp @@ -43,7 +43,7 @@ void UnorderedInputStream::pushDownReadyRFList(std::vector rea { for (const RuntimeFilterPtr & rf : readyRFList) { - auto rs_operator = rf->parseToRSOperator(task_pool->getColumnToRead()); + auto rs_operator = rf->parseToRSOperator(); task_pool->appendRSOperator(rs_operator); } } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp index ce29ff48d67..8cb836db36f 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp @@ -2176,10 +2176,15 @@ try const ColumnDefines columns_to_read = {ColumnDefine{1, "a", std::make_shared()}, ColumnDefine{2, "b", std::make_shared()}}; + // Only need id of ColumnInfo + TiDB::ColumnInfo a, b; + a.id = 1; + b.id = 2; + ColumnInfos column_infos = {a, b}; auto dag_query = std::make_unique( filters, pushed_down_filters, // Not care now - std::vector{}, // Not care now + column_infos, std::vector{}, 0, context->getTimezoneInfo()); @@ -2193,7 +2198,7 @@ try return Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}}; }; const auto op - = DB::DM::FilterParser::parseDAGQuery(*dag_query, columns_to_read, create_attr_by_column_id, Logger::get()); + = DB::DM::FilterParser::parseDAGQuery(*dag_query, column_infos, create_attr_by_column_id, Logger::get()); ASSERT_EQ(op->toDebugString(), "{\"op\":\"in\",\"col\":\"b\",\"value\":\"[\"1\",\"2\"]}"); } CATCH diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 8056f4cca36..e59ba187ceb 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -730,7 +730,6 @@ DM::RowKeyRanges StorageDeltaMerge::parseMvccQueryInfo( DM::RSOperatorPtr StorageDeltaMerge::buildRSOperator( const std::unique_ptr & dag_query, - const ColumnDefines & columns_to_read, const Context & context, const LoggerPtr & tracing_logger) { @@ -751,8 +750,11 @@ DM::RSOperatorPtr StorageDeltaMerge::buildRSOperator( // Maybe throw an exception? Or check if `type` is nullptr before creating filter? return Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}}; }; - rs_operator - = FilterParser::parseDAGQuery(*dag_query, columns_to_read, std::move(create_attr_by_column_id), log); + rs_operator = FilterParser::parseDAGQuery( + *dag_query, + dag_query->source_columns, + std::move(create_attr_by_column_id), + log); if (likely(rs_operator != DM::EMPTY_RS_OPERATOR)) LOG_DEBUG(tracing_logger, "Rough set filter: {}", rs_operator->toDebugString()); } @@ -838,10 +840,14 @@ DM::PushDownFilterPtr StorageDeltaMerge::buildPushDownFilter( has_cast) { NamesWithAliases project_cols; - for (size_t i = 0; i < columns_to_read.size(); ++i) + for (size_t i = 0; i < table_scan_column_info.size(); ++i) { - if (filter_col_id_set.contains(columns_to_read[i].id)) - project_cols.emplace_back(casted_columns[i], columns_to_read[i].name); + if (filter_col_id_set.contains(table_scan_column_info[i].id)) + { + auto it = columns_to_read_map.find(table_scan_column_info[i].id); + RUNTIME_CHECK(it != columns_to_read_map.end(), table_scan_column_info[i].id); + project_cols.emplace_back(casted_columns[i], it->second.name); + } } actions->add(ExpressionAction::project(project_cols)); @@ -902,7 +908,7 @@ DM::PushDownFilterPtr StorageDeltaMerge::parsePushDownFilter( return EMPTY_FILTER; // build rough set operator - const DM::RSOperatorPtr rs_operator = buildRSOperator(dag_query, columns_to_read, context, tracing_logger); + const DM::RSOperatorPtr rs_operator = buildRSOperator(dag_query, context, tracing_logger); // build push down filter const auto & columns_to_read_info = dag_query->source_columns; const auto & pushed_down_filters = dag_query->pushed_down_filters; @@ -1011,6 +1017,11 @@ RuntimeFilteList StorageDeltaMerge::parseRuntimeFilterList( auto runtime_filter_list = db_context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds( query_info.dag_query->runtime_filter_ids); LOG_DEBUG(log, "build runtime filter in local stream, list size:{}", runtime_filter_list.size()); + const ColumnDefines & table_column_defines = getStoreColumnDefines(); + for (auto & rf : runtime_filter_list) + { + rf->setTargetAttr(query_info.dag_query->source_columns, table_column_defines); + } return runtime_filter_list; } diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 6da11ecbc8d..dcb2d9e08e5 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -36,6 +36,8 @@ namespace DB { struct CheckpointInfo; using CheckpointInfoPtr = std::shared_ptr; +class MockStorage; + namespace DM { struct RowKeyRange; @@ -243,7 +245,6 @@ class StorageDeltaMerge DM::RSOperatorPtr buildRSOperator( const std::unique_ptr & dag_query, - const DM::ColumnDefines & columns_to_read, const Context & context, const LoggerPtr & tracing_logger); /// Get filters from query to construct rough set operation and push down filters. @@ -319,6 +320,8 @@ class StorageDeltaMerge Context & global_context; LoggerPtr log; + + friend class MockStorage; }; diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index 5a2ae4235f1..8dd175ae12c 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -485,8 +485,11 @@ DM::RSOperatorPtr StorageDisaggregated::buildRSOperator( return DM::Attr{.col_name = iter->name, .col_id = iter->id, .type = iter->type}; return DM::Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}}; }; - auto rs_operator - = DM::FilterParser::parseDAGQuery(*dag_query, *columns_to_read, std::move(create_attr_by_column_id), log); + auto rs_operator = DM::FilterParser::parseDAGQuery( + *dag_query, + table_scan.getColumns(), + std::move(create_attr_by_column_id), + log); if (likely(rs_operator != DM::EMPTY_RS_OPERATOR)) LOG_DEBUG(log, "Rough set filter: {}", rs_operator->toDebugString()); return rs_operator; diff --git a/dbms/src/Storages/tests/gtest_filter_parser.cpp b/dbms/src/Storages/tests/gtest_filter_parser.cpp index 553d6615d02..50cc6c10a78 100644 --- a/dbms/src/Storages/tests/gtest_filter_parser.cpp +++ b/dbms/src/Storages/tests/gtest_filter_parser.cpp @@ -119,7 +119,7 @@ DM::RSOperatorPtr FilterParserTest::generateRsOperator( return DM::Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}}; }; - return DM::FilterParser::parseDAGQuery(*dag_query, columns_to_read, std::move(create_attr_by_column_id), log); + return DM::FilterParser::parseDAGQuery(*dag_query, table_info.columns, std::move(create_attr_by_column_id), log); } // Test cases for col and literal diff --git a/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp b/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp index 5aac492f592..4bab7d8f809 100644 --- a/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp +++ b/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp @@ -113,7 +113,7 @@ DM::PushDownFilterPtr ParsePushDownFilterTest::generatePushDownFilter( }; auto rs_operator - = DM::FilterParser::parseDAGQuery(*dag_query, columns_to_read, std::move(create_attr_by_column_id), log); + = DM::FilterParser::parseDAGQuery(*dag_query, table_info.columns, std::move(create_attr_by_column_id), log); auto push_down_filter = StorageDeltaMerge::buildPushDownFilter( rs_operator, table_info.columns, diff --git a/dbms/src/TiDB/Schema/TiDB.h b/dbms/src/TiDB/Schema/TiDB.h index 96f3a0d2fc6..5d8884fc19a 100644 --- a/dbms/src/TiDB/Schema/TiDB.h +++ b/dbms/src/TiDB/Schema/TiDB.h @@ -422,3 +422,13 @@ std::vector toTiDBColumnInfos( const ::google::protobuf::RepeatedPtrField & tipb_column_infos); } // namespace TiDB + +template <> +struct fmt::formatter +{ + template + auto format(const TiDB::ColumnInfo & ci, FormatContext & ctx) const -> decltype(ctx.out()) + { + return fmt::format_to(ctx.out(), "{}", ci.id); + } +}; diff --git a/tests/fullstack-test/expr/generated_columns.test b/tests/fullstack-test/expr/generated_columns.test new file mode 100644 index 00000000000..1e3a15632bf --- /dev/null +++ b/tests/fullstack-test/expr/generated_columns.test @@ -0,0 +1,98 @@ +# Copyright 2024 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t; +mysql> create table if not exists test.t(a int); +mysql> alter table test.t add column b int as (a+1) virtual; +mysql> alter table test.t add column c int; +mysql> alter table test.t add column d int as (c+1) virtual; +mysql> alter table test.t add column e int; + +mysql> insert into test.t(a, c, e) values(1, 10, 100), (2, 20, 200), (3, 30, 300), (4, 40, 400), (5, 50, 500), (6, 60, 600), (7, 70, 700), (8, 80, 800), (9, 90, 900); + +mysql> alter table test.t set tiflash replica 1; + +func> wait_table test t + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 10; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 1 | 2 | 10 | 11 | 100 | ++------+------+------+------+------+ + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 20; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 2 | 3 | 20 | 21 | 200 | ++------+------+------+------+------+ + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 30; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 3 | 4 | 30 | 31 | 300 | ++------+------+------+------+------+ + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 40; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 4 | 5 | 40 | 41 | 400 | ++------+------+------+------+------+ + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 50; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 5 | 6 | 50 | 51 | 500 | ++------+------+------+------+------+ + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 60; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 6 | 7 | 60 | 61 | 600 | ++------+------+------+------+------+ + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 70; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 7 | 8 | 70 | 71 | 700 | ++------+------+------+------+------+ + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 80; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 8 | 9 | 80 | 81 | 800 | ++------+------+------+------+------+ + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 80; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 8 | 9 | 80 | 81 | 800 | ++------+------+------+------+------+ + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 90; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 9 | 10 | 90 | 91 | 900 | ++------+------+------+------+------+ + +mysql> drop table test.t; diff --git a/tests/fullstack-test/expr/generated_columns2.test b/tests/fullstack-test/expr/generated_columns2.test new file mode 100644 index 00000000000..ded5572b6ab --- /dev/null +++ b/tests/fullstack-test/expr/generated_columns2.test @@ -0,0 +1,49 @@ +# Copyright 2024 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t; +mysql> create table if not exists test.t(a int); +mysql> alter table test.t add column b int as (a+1) virtual; +mysql> alter table test.t add column c int; +mysql> alter table test.t add column d int as (c+1) virtual; +mysql> alter table test.t add column t time(6); + +mysql> insert into test.t(a, c, t) values(1, 2, '000:10:10.123456'), (3, 4, '001:10:10.123456'), (5, 6, '002:10:10.123456'); +mysql> insert into test.t(a, c, t) select a, c, t + 0.001 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 0.002 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 0.004 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 0.008 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 0.016 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 0.032 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 0.064 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 0.128 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 0.256 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 0.512 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 1.024 from test.t; +mysql> insert into test.t(a, c, t) select a, c, t + 2.048 from test.t; + +mysql> alter table test.t set tiflash replica 1; + +func> wait_table test t + +mysql> analyze table test.t; + +mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, hour(t) from test.t where t = '000:10:10.123456'; ++------+------+------+------+---------+ +| a | b | c | d | hour(t) | ++------+------+------+------+---------+ +| 1 | 2 | 2 | 3 | 0 | ++------+------+------+------+---------+ + +mysql> drop table test.t; diff --git a/tests/fullstack-test/expr/runtime_filter.test b/tests/fullstack-test/expr/runtime_filter.test new file mode 100644 index 00000000000..cfbe663c6b6 --- /dev/null +++ b/tests/fullstack-test/expr/runtime_filter.test @@ -0,0 +1,46 @@ +# Copyright 2024 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t; +mysql> create table if not exists test.t(a int); +mysql> alter table test.t add column b int as (a+1) virtual; +mysql> alter table test.t add column c int; +mysql> alter table test.t add column d int as (c+1) virtual; +mysql> alter table test.t add column e int; +mysql> insert into test.t(a, c, e) values(1, 10, 100), (2, 20, 200), (3, 30, 300), (4, 40, 400), (5, 50, 500), (6, 60, 600), (7, 70, 700), (8, 80, 800), (9, 90, 900); +mysql> alter table test.t set tiflash replica 1; + +mysql> drop table if exists test.t2; +mysql> create table if not exists test.t2(f int); +mysql> insert into test.t2 values(10); +mysql> alter table test.t2 set tiflash replica 1; + +func> wait_table test t +func> wait_table test t2 + +mysql> set tidb_runtime_filter_mode="LOCAL"; set tidb_isolation_read_engines='tiflash'; select /*+ HASH_JOIN_BUILD(test.t2) */ a, c, e from test.t t, test.t2 t2 where t.c = t2.f; ++------+------+------+ +| a | c | e | ++------+------+------+ +| 1 | 10 | 100 | ++------+------+------+ + +mysql> set tidb_runtime_filter_mode="LOCAL"; set tidb_isolation_read_engines='tiflash'; select /*+ HASH_JOIN_BUILD(test.t2) */ a, b, c, d, e from test.t t, test.t2 t2 where t.c = t2.f; ++------+------+------+------+------+ +| a | b | c | d | e | ++------+------+------+------+------+ +| 1 | 2 | 10 | 11 | 100 | ++------+------+------+------+------+ + +mysql> drop table test.t;