diff --git a/dbms/src/Debug/MockStorage.cpp b/dbms/src/Debug/MockStorage.cpp index 0e2341a6e77..9db555e20ff 100644 --- a/dbms/src/Debug/MockStorage.cpp +++ b/dbms/src/Debug/MockStorage.cpp @@ -580,6 +580,11 @@ TableInfo MockStorage::getTableInfoForDeltaMerge(const String & name) return table_infos_for_delta_merge[name]; } +DM::ColumnDefines MockStorage::getStoreColumnDefines(Int64 table_id) +{ + return storage_delta_merge_map[table_id]->getStoreColumnDefines(); +} + ColumnInfos mockColumnInfosToTiDBColumnInfos(const MockColumnInfoVec & mock_column_infos) { ColumnID col_id = 0; diff --git a/dbms/src/Debug/MockStorage.h b/dbms/src/Debug/MockStorage.h index 090cd36f37f..8cda78df16b 100644 --- a/dbms/src/Debug/MockStorage.h +++ b/dbms/src/Debug/MockStorage.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -146,6 +147,7 @@ class MockStorage TableInfo getTableInfo(const String & name); TableInfo getTableInfoForDeltaMerge(const String & name); + DM::ColumnDefines getStoreColumnDefines(Int64 table_id); size_t getTableScanConcurrencyHint(const TiDBTableScan & table_scan); diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp index 8554e434818..c10d983e81b 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp @@ -159,8 +159,7 @@ void PhysicalMockTableScan::buildPipelineExecGroupImpl( { if (auto * source_op = dynamic_cast(group_builder.getCurBuilder(i).source_op.get())) { - auto runtime_filter_list - = context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids); + auto runtime_filter_list = getRuntimeFilterList(context); // todo config max wait time source_op->setRuntimeFilterInfo(runtime_filter_list, rf_max_wait_time_ms); } @@ -232,11 +231,23 @@ void PhysicalMockTableScan::buildRuntimeFilterInLocalStream(Context & context) { if (auto * p_stream = dynamic_cast(local_stream.get())) { - auto runtime_filter_list - = context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids); + auto runtime_filter_list = getRuntimeFilterList(context); // todo config max wait time p_stream->setRuntimeFilterInfo(runtime_filter_list, rf_max_wait_time_ms); } } } + +RuntimeFilteList PhysicalMockTableScan::getRuntimeFilterList(Context & context) +{ + auto mock_column_infos = context.mockStorage()->getTableSchemaForDeltaMerge(table_id); + auto column_infos = mockColumnInfosToTiDBColumnInfos(mock_column_infos); + auto column_defines = context.mockStorage()->getStoreColumnDefines(table_id); + auto rfs = context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids); + for (auto & rf : rfs) + { + rf->setTargetAttr(column_infos, column_defines); + } + return rfs; +} } // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h index 778b77e5932..4e305314d3f 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -70,6 +71,8 @@ class PhysicalMockTableScan : public PhysicalLeaf void buildRuntimeFilterInLocalStream(Context & context); + RuntimeFilteList getRuntimeFilterList(Context & context); + private: FilterConditions filter_conditions; Block sample_block;