Skip to content

Commit

Permalink
Storages: Fix the issue of obtaining incorrect column information whe…
Browse files Browse the repository at this point in the history
…n there are virtual columns in the query
  • Loading branch information
JinheLin committed Jul 6, 2024
1 parent cc1bf74 commit 01fd012
Show file tree
Hide file tree
Showing 24 changed files with 346 additions and 65 deletions.
11 changes: 9 additions & 2 deletions dbms/src/DataStreams/RuntimeFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,14 @@ bool RuntimeFilter::updateStatus(RuntimeFilterStatus status_, const std::string
return true;
}

DM::RSOperatorPtr RuntimeFilter::parseToRSOperator(DM::ColumnDefines & columns_to_read)
void RuntimeFilter::setTargetAttr(
const DM::ColumnInfos & scan_column_infos,
const DM::ColumnDefines & table_column_defines)
{
target_attr = DM::FilterParser::createAttr(target_expr, scan_column_infos, table_column_defines);
}

DM::RSOperatorPtr RuntimeFilter::parseToRSOperator()
{
switch (rf_type)
{
Expand All @@ -216,7 +223,7 @@ DM::RSOperatorPtr RuntimeFilter::parseToRSOperator(DM::ColumnDefines & columns_t
return DM::FilterParser::parseRFInExpr(
rf_type,
target_expr,
columns_to_read,
target_attr,
in_values_set->getUniqueSetElements(),
timezone_info);
case tipb::MIN_MAX:
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/DataStreams/RuntimeFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class RuntimeFilter
}
source_expr = rf_pb.source_expr_list().Get(0);
target_expr = rf_pb.target_expr_list().Get(0);
};
}

std::string getSourceColumnName() const;

Expand Down Expand Up @@ -77,7 +77,8 @@ class RuntimeFilter

bool await(int64_t ms_remaining);

DM::RSOperatorPtr parseToRSOperator(DM::ColumnDefines & columns_to_read);
void setTargetAttr(const DM::ColumnInfos & scan_column_infos, const DM::ColumnDefines & table_column_defines);
DM::RSOperatorPtr parseToRSOperator();

const int id;

Expand All @@ -86,6 +87,7 @@ class RuntimeFilter

tipb::Expr source_expr;
tipb::Expr target_expr;
std::optional<DM::Attr> target_attr;
const tipb::RuntimeFilterType rf_type;
TimezoneInfo timezone_info;
// thread safe
Expand Down
17 changes: 13 additions & 4 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,11 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(
if (filter_conditions && filter_conditions->hasValue())
{
auto analyzer = std::make_unique<DAGExpressionAnalyzer>(names_and_types_map_for_delta_merge[table_id], context);
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions->conditions,
empty_pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
scan_column_infos,
runtime_filter_ids,
rf_max_wait_time_ms,
context.getTimezoneInfo());
Expand All @@ -227,10 +228,11 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(
else
{
static const google::protobuf::RepeatedPtrField<tipb::Expr> empty_filters{};
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
empty_filters,
empty_pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
scan_column_infos,
runtime_filter_ids,
rf_max_wait_time_ms,
context.getTimezoneInfo());
Expand All @@ -257,10 +259,11 @@ void MockStorage::buildExecFromDeltaMerge(
if (filter_conditions && filter_conditions->hasValue())
{
auto analyzer = std::make_unique<DAGExpressionAnalyzer>(names_and_types_map_for_delta_merge[table_id], context);
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions->conditions,
empty_pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
scan_column_infos,
runtime_filter_ids,
rf_max_wait_time_ms,
context.getTimezoneInfo());
Expand Down Expand Up @@ -289,10 +292,11 @@ void MockStorage::buildExecFromDeltaMerge(
else
{
static const google::protobuf::RepeatedPtrField<tipb::Expr> empty_filters{};
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
empty_filters,
empty_pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
scan_column_infos,
runtime_filter_ids,
rf_max_wait_time_ms,
context.getTimezoneInfo());
Expand Down Expand Up @@ -576,6 +580,11 @@ TableInfo MockStorage::getTableInfoForDeltaMerge(const String & name)
return table_infos_for_delta_merge[name];
}

DM::ColumnDefines MockStorage::getStoreColumnDefines(Int64 table_id)
{
return storage_delta_merge_map[table_id]->getStoreColumnDefines();
}

ColumnInfos mockColumnInfosToTiDBColumnInfos(const MockColumnInfoVec & mock_column_infos)
{
ColumnID col_id = 0;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Debug/MockStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Operators/Operator.h>
#include <Storages/DeltaMerge/ColumnDefine_fwd.h>
#include <TiDB/Schema/TiDB.h>
#include <common/types.h>

Expand Down Expand Up @@ -146,6 +147,7 @@ class MockStorage

TableInfo getTableInfo(const String & name);
TableInfo getTableInfoForDeltaMerge(const String & name);
DM::ColumnDefines getStoreColumnDefines(Int64 table_id);

size_t getTableScanConcurrencyHint(const TiDBTableScan & table_scan);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/RFWaitTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class RFWaitTask : public Task
{
for (const RuntimeFilterPtr & rf : ready_rf_list)
{
auto rs_operator = rf->parseToRSOperator(task_pool->getColumnToRead());
auto rs_operator = rf->parseToRSOperator();
task_pool->appendRSOperator(rs_operator);
}
DM::SegmentReadTaskScheduler::instance().add(task_pool);
Expand Down
19 changes: 15 additions & 4 deletions dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ void PhysicalMockTableScan::buildPipelineExecGroupImpl(
{
if (auto * source_op = dynamic_cast<UnorderedSourceOp *>(group_builder.getCurBuilder(i).source_op.get()))
{
auto runtime_filter_list
= context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids);
auto runtime_filter_list = getRuntimeFilterList(context);
// todo config max wait time
source_op->setRuntimeFilterInfo(runtime_filter_list, rf_max_wait_time_ms);
}
Expand Down Expand Up @@ -232,11 +231,23 @@ void PhysicalMockTableScan::buildRuntimeFilterInLocalStream(Context & context)
{
if (auto * p_stream = dynamic_cast<DM::UnorderedInputStream *>(local_stream.get()))
{
auto runtime_filter_list
= context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids);
auto runtime_filter_list = getRuntimeFilterList(context);
// todo config max wait time
p_stream->setRuntimeFilterInfo(runtime_filter_list, rf_max_wait_time_ms);
}
}
}

RuntimeFilteList PhysicalMockTableScan::getRuntimeFilterList(Context & context)
{
auto mock_column_infos = context.mockStorage()->getTableSchemaForDeltaMerge(table_id);
auto column_infos = mockColumnInfosToTiDBColumnInfos(mock_column_infos);
auto column_defines = context.mockStorage()->getStoreColumnDefines(table_id);
auto rfs = context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids);
for (auto & rf : rfs)
{
rf->setTargetAttr(column_infos, column_defines);
}
return rfs;
}
} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Flash/Coprocessor/RuntimeFilterMgr.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Flash/Planner/Plans/PhysicalLeaf.h>
#include <tipb/executor.pb.h>
Expand Down Expand Up @@ -70,6 +71,8 @@ class PhysicalMockTableScan : public PhysicalLeaf

void buildRuntimeFilterInLocalStream(Context & context);

RuntimeFilteList getRuntimeFilterList(Context & context);

private:
FilterConditions filter_conditions;
Block sample_block;
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnDefine_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@
#include <unordered_map>
#include <vector>

namespace TiDB
{
struct ColumnInfo;
}

namespace DB::DM
{
struct ColumnDefine;
using ColumnDefines = std::vector<ColumnDefine>;
using ColumnDefinesPtr = std::shared_ptr<ColumnDefines>;
using ColumnDefineMap = std::unordered_map<DB::ColumnID, ColumnDefine>;

using ColumnInfos = std::vector<TiDB::ColumnInfo>;
} // namespace DB::DM
14 changes: 9 additions & 5 deletions dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,14 @@ PushDownFilterPtr PushDownFilter::build(
has_cast)
{
NamesWithAliases project_cols;
for (size_t i = 0; i < columns_to_read.size(); ++i)
for (size_t i = 0; i < table_scan_column_info.size(); ++i)
{
if (filter_col_id_set.contains(columns_to_read[i].id))
project_cols.emplace_back(casted_columns[i], columns_to_read[i].name);
if (filter_col_id_set.contains(table_scan_column_info[i].id))
{
auto it = columns_to_read_map.find(table_scan_column_info[i].id);
RUNTIME_CHECK(it != columns_to_read_map.end(), table_scan_column_info[i].id);
project_cols.emplace_back(casted_columns[i], it->second.name);
}
}
actions->add(ExpressionAction::project(project_cols));

Expand Down Expand Up @@ -162,15 +166,15 @@ PushDownFilterPtr PushDownFilter::build(
if (unlikely(dag_query == nullptr))
return EMPTY_FILTER;

const auto & columns_to_read_info = dag_query->source_columns;
// build rough set operator
const auto rs_operator = RSOperator::build(
dag_query,
columns_to_read,
columns_to_read_info,
table_column_defines,
context.getSettingsRef().dt_enable_rough_set_filter,
tracing_logger);
// build push down filter
const auto & columns_to_read_info = dag_query->source_columns;
const auto & pushed_down_filters = dag_query->pushed_down_filters;
if (unlikely(context.getSettingsRef().force_push_down_all_filters_to_scan) && !dag_query->filters.empty())
{
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ RSOperatorPtr createUnsupported(const String & reason)

RSOperatorPtr RSOperator::build(
const std::unique_ptr<DAGQueryInfo> & dag_query,
const ColumnDefines & columns_to_read,
const ColumnInfos & scan_column_infos,
const ColumnDefines & table_column_defines,
bool enable_rs_filter,
const LoggerPtr & tracing_logger)
Expand All @@ -75,8 +75,11 @@ RSOperatorPtr RSOperator::build(
// Maybe throw an exception? Or check if `type` is nullptr before creating filter?
return Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}};
};
DM::RSOperatorPtr rs_operator
= FilterParser::parseDAGQuery(*dag_query, columns_to_read, std::move(create_attr_by_column_id), tracing_logger);
auto rs_operator = FilterParser::parseDAGQuery(
*dag_query,
scan_column_infos,
std::move(create_attr_by_column_id),
tracing_logger);
if (likely(rs_operator != DM::EMPTY_RS_OPERATOR))
LOG_DEBUG(tracing_logger, "Rough set filter: {}", rs_operator->toDebugString());

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Filter/RSOperator.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class RSOperator

static RSOperatorPtr build(
const std::unique_ptr<DAGQueryInfo> & dag_query,
const ColumnDefines & columns_to_read,
const ColumnInfos & scan_column_infos,
const ColumnDefines & table_column_defines,
bool enable_rs_filter,
const LoggerPtr & tracing_logger);
Expand Down
Loading

0 comments on commit 01fd012

Please sign in to comment.