Skip to content

Commit

Permalink
ci
Browse files Browse the repository at this point in the history
  • Loading branch information
JinheLin committed Jul 5, 2024
1 parent b1a2069 commit 446513a
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 4 deletions.
5 changes: 5 additions & 0 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,11 @@ TableInfo MockStorage::getTableInfoForDeltaMerge(const String & name)
return table_infos_for_delta_merge[name];
}

DM::ColumnDefines MockStorage::getStoreColumnDefines(Int64 table_id)
{
return storage_delta_merge_map[table_id]->getStoreColumnDefines();
}

ColumnInfos mockColumnInfosToTiDBColumnInfos(const MockColumnInfoVec & mock_column_infos)
{
ColumnID col_id = 0;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Debug/MockStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Operators/Operator.h>
#include <Storages/DeltaMerge/ColumnDefine_fwd.h>
#include <TiDB/Schema/TiDB.h>
#include <common/types.h>

Expand Down Expand Up @@ -146,6 +147,7 @@ class MockStorage

TableInfo getTableInfo(const String & name);
TableInfo getTableInfoForDeltaMerge(const String & name);
DM::ColumnDefines getStoreColumnDefines(Int64 table_id);

size_t getTableScanConcurrencyHint(const TiDBTableScan & table_scan);

Expand Down
19 changes: 15 additions & 4 deletions dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ void PhysicalMockTableScan::buildPipelineExecGroupImpl(
{
if (auto * source_op = dynamic_cast<UnorderedSourceOp *>(group_builder.getCurBuilder(i).source_op.get()))
{
auto runtime_filter_list
= context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids);
auto runtime_filter_list = getRuntimeFilterList(context);
// todo config max wait time
source_op->setRuntimeFilterInfo(runtime_filter_list, rf_max_wait_time_ms);
}
Expand Down Expand Up @@ -232,11 +231,23 @@ void PhysicalMockTableScan::buildRuntimeFilterInLocalStream(Context & context)
{
if (auto * p_stream = dynamic_cast<DM::UnorderedInputStream *>(local_stream.get()))
{
auto runtime_filter_list
= context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids);
auto runtime_filter_list = getRuntimeFilterList(context);
// todo config max wait time
p_stream->setRuntimeFilterInfo(runtime_filter_list, rf_max_wait_time_ms);
}
}
}

RuntimeFilteList PhysicalMockTableScan::getRuntimeFilterList(Context & context)
{
auto mock_column_infos = context.mockStorage()->getTableSchemaForDeltaMerge(table_id);
auto column_infos = mockColumnInfosToTiDBColumnInfos(mock_column_infos);
auto column_defines = context.mockStorage()->getStoreColumnDefines(table_id);
auto rfs = context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids);
for (auto & rf : rfs)
{
rf->setTargetAttr(column_infos, column_defines);
}
return rfs;
}
} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Flash/Coprocessor/RuntimeFilterMgr.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Flash/Planner/Plans/PhysicalLeaf.h>
#include <tipb/executor.pb.h>
Expand Down Expand Up @@ -70,6 +71,8 @@ class PhysicalMockTableScan : public PhysicalLeaf

void buildRuntimeFilterInLocalStream(Context & context);

RuntimeFilteList getRuntimeFilterList(Context & context);

private:
FilterConditions filter_conditions;
Block sample_block;
Expand Down

0 comments on commit 446513a

Please sign in to comment.