Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: Fix obtaining incorrect column information when there are virtual columns in the query #9189

Merged
merged 3 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions dbms/src/DataStreams/RuntimeFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,14 @@ bool RuntimeFilter::updateStatus(RuntimeFilterStatus status_, const std::string
return true;
}

DM::RSOperatorPtr RuntimeFilter::parseToRSOperator(DM::ColumnDefines & columns_to_read)
void RuntimeFilter::setTargetAttr(
const DM::ColumnInfos & scan_column_infos,
const DM::ColumnDefines & table_column_defines)
{
target_attr = DM::FilterParser::createAttr(target_expr, scan_column_infos, table_column_defines);
}

DM::RSOperatorPtr RuntimeFilter::parseToRSOperator()
{
switch (rf_type)
{
Expand All @@ -216,7 +223,7 @@ DM::RSOperatorPtr RuntimeFilter::parseToRSOperator(DM::ColumnDefines & columns_t
return DM::FilterParser::parseRFInExpr(
rf_type,
target_expr,
columns_to_read,
target_attr,
in_values_set->getUniqueSetElements(),
timezone_info);
case tipb::MIN_MAX:
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/DataStreams/RuntimeFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class RuntimeFilter
}
source_expr = rf_pb.source_expr_list().Get(0);
target_expr = rf_pb.target_expr_list().Get(0);
};
}

std::string getSourceColumnName() const;

Expand Down Expand Up @@ -77,7 +77,8 @@ class RuntimeFilter

bool await(int64_t ms_remaining);

DM::RSOperatorPtr parseToRSOperator(DM::ColumnDefines & columns_to_read);
void setTargetAttr(const DM::ColumnInfos & scan_column_infos, const DM::ColumnDefines & table_column_defines);
DM::RSOperatorPtr parseToRSOperator();

const int id;

Expand All @@ -86,6 +87,7 @@ class RuntimeFilter

tipb::Expr source_expr;
tipb::Expr target_expr;
std::optional<DM::Attr> target_attr;
const tipb::RuntimeFilterType rf_type;
TimezoneInfo timezone_info;
// thread safe
Expand Down
17 changes: 13 additions & 4 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,11 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(
if (filter_conditions && filter_conditions->hasValue())
{
auto analyzer = std::make_unique<DAGExpressionAnalyzer>(names_and_types_map_for_delta_merge[table_id], context);
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions->conditions,
empty_pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only local const references prolong the lifespan a temporary.
ref: https://stackoverflow.com/questions/2784262/

As a const reference member of DAGQueryInfo, source_columns cannot prolong the lifespan of a temporary.
DAGQueryInfo::source_columns will be a dangle reference after the contructor of DAGQueryInfo exits.

scan_column_infos,
runtime_filter_ids,
rf_max_wait_time_ms,
context.getTimezoneInfo());
Expand All @@ -227,10 +228,11 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(
else
{
static const google::protobuf::RepeatedPtrField<tipb::Expr> empty_filters{};
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
empty_filters,
empty_pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
scan_column_infos,
runtime_filter_ids,
rf_max_wait_time_ms,
context.getTimezoneInfo());
Expand All @@ -257,10 +259,11 @@ void MockStorage::buildExecFromDeltaMerge(
if (filter_conditions && filter_conditions->hasValue())
{
auto analyzer = std::make_unique<DAGExpressionAnalyzer>(names_and_types_map_for_delta_merge[table_id], context);
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions->conditions,
empty_pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
scan_column_infos,
runtime_filter_ids,
rf_max_wait_time_ms,
context.getTimezoneInfo());
Expand Down Expand Up @@ -289,10 +292,11 @@ void MockStorage::buildExecFromDeltaMerge(
else
{
static const google::protobuf::RepeatedPtrField<tipb::Expr> empty_filters{};
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
empty_filters,
empty_pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
scan_column_infos,
runtime_filter_ids,
rf_max_wait_time_ms,
context.getTimezoneInfo());
Expand Down Expand Up @@ -576,6 +580,11 @@ TableInfo MockStorage::getTableInfoForDeltaMerge(const String & name)
return table_infos_for_delta_merge[name];
}

DM::ColumnDefines MockStorage::getStoreColumnDefines(Int64 table_id)
{
return storage_delta_merge_map[table_id]->getStoreColumnDefines();
}

ColumnInfos mockColumnInfosToTiDBColumnInfos(const MockColumnInfoVec & mock_column_infos)
{
ColumnID col_id = 0;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Debug/MockStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Operators/Operator.h>
#include <Storages/DeltaMerge/ColumnDefine_fwd.h>
#include <TiDB/Schema/TiDB.h>
#include <common/types.h>

Expand Down Expand Up @@ -146,6 +147,7 @@ class MockStorage

TableInfo getTableInfo(const String & name);
TableInfo getTableInfoForDeltaMerge(const String & name);
DM::ColumnDefines getStoreColumnDefines(Int64 table_id);

size_t getTableScanConcurrencyHint(const TiDBTableScan & table_scan);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/RFWaitTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class RFWaitTask : public Task
{
for (const RuntimeFilterPtr & rf : ready_rf_list)
{
auto rs_operator = rf->parseToRSOperator(task_pool->getColumnToRead());
auto rs_operator = rf->parseToRSOperator();
task_pool->appendRSOperator(rs_operator);
}
DM::SegmentReadTaskScheduler::instance().add(task_pool);
Expand Down
19 changes: 15 additions & 4 deletions dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ void PhysicalMockTableScan::buildPipelineExecGroupImpl(
{
if (auto * source_op = dynamic_cast<UnorderedSourceOp *>(group_builder.getCurBuilder(i).source_op.get()))
{
auto runtime_filter_list
= context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids);
auto runtime_filter_list = getRuntimeFilterList(context);
// todo config max wait time
source_op->setRuntimeFilterInfo(runtime_filter_list, rf_max_wait_time_ms);
}
Expand Down Expand Up @@ -232,11 +231,23 @@ void PhysicalMockTableScan::buildRuntimeFilterInLocalStream(Context & context)
{
if (auto * p_stream = dynamic_cast<DM::UnorderedInputStream *>(local_stream.get()))
{
auto runtime_filter_list
= context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids);
auto runtime_filter_list = getRuntimeFilterList(context);
// todo config max wait time
p_stream->setRuntimeFilterInfo(runtime_filter_list, rf_max_wait_time_ms);
}
}
}

RuntimeFilteList PhysicalMockTableScan::getRuntimeFilterList(Context & context)
{
auto mock_column_infos = context.mockStorage()->getTableSchemaForDeltaMerge(table_id);
auto column_infos = mockColumnInfosToTiDBColumnInfos(mock_column_infos);
auto column_defines = context.mockStorage()->getStoreColumnDefines(table_id);
auto rfs = context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids);
for (auto & rf : rfs)
{
rf->setTargetAttr(column_infos, column_defines);
}
return rfs;
}
} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Flash/Coprocessor/RuntimeFilterMgr.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Flash/Planner/Plans/PhysicalLeaf.h>
#include <tipb/executor.pb.h>
Expand Down Expand Up @@ -70,6 +71,8 @@ class PhysicalMockTableScan : public PhysicalLeaf

void buildRuntimeFilterInLocalStream(Context & context);

RuntimeFilteList getRuntimeFilterList(Context & context);

private:
FilterConditions filter_conditions;
Block sample_block;
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnDefine_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@
#include <unordered_map>
#include <vector>

namespace TiDB
{
struct ColumnInfo;
}

namespace DB::DM
{
struct ColumnDefine;
using ColumnDefines = std::vector<ColumnDefine>;
using ColumnDefinesPtr = std::shared_ptr<ColumnDefines>;
using ColumnDefineMap = std::unordered_map<DB::ColumnID, ColumnDefine>;

using ColumnInfos = std::vector<TiDB::ColumnInfo>;
} // namespace DB::DM
14 changes: 9 additions & 5 deletions dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,14 @@ PushDownFilterPtr PushDownFilter::build(
has_cast)
{
NamesWithAliases project_cols;
for (size_t i = 0; i < columns_to_read.size(); ++i)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

casted_columns align with table_scan_column_info one by one.

for (size_t i = 0; i < table_scan_column_info.size(); ++i)
{
if (filter_col_id_set.contains(columns_to_read[i].id))
project_cols.emplace_back(casted_columns[i], columns_to_read[i].name);
if (filter_col_id_set.contains(table_scan_column_info[i].id))
{
auto it = columns_to_read_map.find(table_scan_column_info[i].id);
RUNTIME_CHECK(it != columns_to_read_map.end(), table_scan_column_info[i].id);
project_cols.emplace_back(casted_columns[i], it->second.name);
}
}
actions->add(ExpressionAction::project(project_cols));

Expand Down Expand Up @@ -162,15 +166,15 @@ PushDownFilterPtr PushDownFilter::build(
if (unlikely(dag_query == nullptr))
return EMPTY_FILTER;

const auto & columns_to_read_info = dag_query->source_columns;
// build rough set operator
const auto rs_operator = RSOperator::build(
dag_query,
columns_to_read,
columns_to_read_info,
table_column_defines,
context.getSettingsRef().dt_enable_rough_set_filter,
tracing_logger);
// build push down filter
const auto & columns_to_read_info = dag_query->source_columns;
const auto & pushed_down_filters = dag_query->pushed_down_filters;
if (unlikely(context.getSettingsRef().force_push_down_all_filters_to_scan) && !dag_query->filters.empty())
{
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ RSOperatorPtr createUnsupported(const String & reason)

RSOperatorPtr RSOperator::build(
const std::unique_ptr<DAGQueryInfo> & dag_query,
const ColumnDefines & columns_to_read,
const ColumnInfos & scan_column_infos,
const ColumnDefines & table_column_defines,
bool enable_rs_filter,
const LoggerPtr & tracing_logger)
Expand All @@ -75,8 +75,11 @@ RSOperatorPtr RSOperator::build(
// Maybe throw an exception? Or check if `type` is nullptr before creating filter?
return Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}};
};
DM::RSOperatorPtr rs_operator
= FilterParser::parseDAGQuery(*dag_query, columns_to_read, std::move(create_attr_by_column_id), tracing_logger);
auto rs_operator = FilterParser::parseDAGQuery(
*dag_query,
scan_column_infos,
std::move(create_attr_by_column_id),
tracing_logger);
if (likely(rs_operator != DM::EMPTY_RS_OPERATOR))
LOG_DEBUG(tracing_logger, "Rough set filter: {}", rs_operator->toDebugString());

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Filter/RSOperator.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class RSOperator

static RSOperatorPtr build(
const std::unique_ptr<DAGQueryInfo> & dag_query,
const ColumnDefines & columns_to_read,
const ColumnInfos & scan_column_infos,
const ColumnDefines & table_column_defines,
bool enable_rs_filter,
const LoggerPtr & tracing_logger);
Expand Down
Loading