Skip to content

Commit

Permalink
Storages: Fix the issue of obtaining incorrect column information whe…
Browse files Browse the repository at this point in the history
…n there are virtual columns in the query
  • Loading branch information
JinheLin committed Jul 5, 2024
1 parent 82d20fe commit 3951444
Show file tree
Hide file tree
Showing 20 changed files with 317 additions and 61 deletions.
11 changes: 9 additions & 2 deletions dbms/src/DataStreams/RuntimeFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,14 @@ bool RuntimeFilter::updateStatus(RuntimeFilterStatus status_, const std::string
return true;
}

DM::RSOperatorPtr RuntimeFilter::parseToRSOperator(DM::ColumnDefines & columns_to_read)
void RuntimeFilter::setTargetAttr(
const DM::ColumnInfos & scan_column_infos,
const DM::ColumnDefines & table_column_defines)
{
target_attr = DM::FilterParser::createAttr(target_expr, scan_column_infos, table_column_defines);
}

DM::RSOperatorPtr RuntimeFilter::parseToRSOperator()
{
switch (rf_type)
{
Expand All @@ -216,7 +223,7 @@ DM::RSOperatorPtr RuntimeFilter::parseToRSOperator(DM::ColumnDefines & columns_t
return DM::FilterParser::parseRFInExpr(
rf_type,
target_expr,
columns_to_read,
target_attr,
in_values_set->getUniqueSetElements(),
timezone_info);
case tipb::MIN_MAX:
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/DataStreams/RuntimeFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class RuntimeFilter
}
source_expr = rf_pb.source_expr_list().Get(0);
target_expr = rf_pb.target_expr_list().Get(0);
};
}

std::string getSourceColumnName() const;

Expand Down Expand Up @@ -77,7 +77,8 @@ class RuntimeFilter

bool await(int64_t ms_remaining);

DM::RSOperatorPtr parseToRSOperator(DM::ColumnDefines & columns_to_read);
void setTargetAttr(const DM::ColumnInfos & scan_column_infos, const DM::ColumnDefines & table_column_defines);
DM::RSOperatorPtr parseToRSOperator();

const int id;

Expand All @@ -86,6 +87,7 @@ class RuntimeFilter

tipb::Expr source_expr;
tipb::Expr target_expr;
std::optional<DM::Attr> target_attr;
const tipb::RuntimeFilterType rf_type;
TimezoneInfo timezone_info;
// thread safe
Expand Down
12 changes: 8 additions & 4 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,11 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(
if (filter_conditions && filter_conditions->hasValue())
{
auto analyzer = std::make_unique<DAGExpressionAnalyzer>(names_and_types_map_for_delta_merge[table_id], context);
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions->conditions,
empty_pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
scan_column_infos,
runtime_filter_ids,
rf_max_wait_time_ms,
context.getTimezoneInfo());
Expand All @@ -227,10 +228,11 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(
else
{
static const google::protobuf::RepeatedPtrField<tipb::Expr> empty_filters{};
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
empty_filters,
empty_pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
scan_column_infos,
runtime_filter_ids,
rf_max_wait_time_ms,
context.getTimezoneInfo());
Expand All @@ -257,10 +259,11 @@ void MockStorage::buildExecFromDeltaMerge(
if (filter_conditions && filter_conditions->hasValue())
{
auto analyzer = std::make_unique<DAGExpressionAnalyzer>(names_and_types_map_for_delta_merge[table_id], context);
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions->conditions,
empty_pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
scan_column_infos,
runtime_filter_ids,
rf_max_wait_time_ms,
context.getTimezoneInfo());
Expand Down Expand Up @@ -289,10 +292,11 @@ void MockStorage::buildExecFromDeltaMerge(
else
{
static const google::protobuf::RepeatedPtrField<tipb::Expr> empty_filters{};
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
empty_filters,
empty_pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
scan_column_infos,
runtime_filter_ids,
rf_max_wait_time_ms,
context.getTimezoneInfo());
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/RFWaitTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class RFWaitTask : public Task
{
for (const RuntimeFilterPtr & rf : ready_rf_list)
{
auto rs_operator = rf->parseToRSOperator(task_pool->getColumnToRead());
auto rs_operator = rf->parseToRSOperator();
task_pool->appendRSOperator(rs_operator);
}
DM::SegmentReadTaskScheduler::instance().add(task_pool);
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnDefine_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@
#include <unordered_map>
#include <vector>

namespace TiDB
{
struct ColumnInfo;
}

namespace DB::DM
{
struct ColumnDefine;
using ColumnDefines = std::vector<ColumnDefine>;
using ColumnDefinesPtr = std::shared_ptr<ColumnDefines>;
using ColumnDefineMap = std::unordered_map<DB::ColumnID, ColumnDefine>;

using ColumnInfos = std::vector<TiDB::ColumnInfo>;
} // namespace DB::DM
14 changes: 9 additions & 5 deletions dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,14 @@ PushDownFilterPtr PushDownFilter::build(
has_cast)
{
NamesWithAliases project_cols;
for (size_t i = 0; i < columns_to_read.size(); ++i)
for (size_t i = 0; i < table_scan_column_info.size(); ++i)
{
if (filter_col_id_set.contains(columns_to_read[i].id))
project_cols.emplace_back(casted_columns[i], columns_to_read[i].name);
if (filter_col_id_set.contains(table_scan_column_info[i].id))
{
auto it = columns_to_read_map.find(table_scan_column_info[i].id);
RUNTIME_CHECK(it != columns_to_read_map.end(), table_scan_column_info[i].id);
project_cols.emplace_back(casted_columns[i], it->second.name);
}
}
actions->add(ExpressionAction::project(project_cols));

Expand Down Expand Up @@ -162,15 +166,15 @@ PushDownFilterPtr PushDownFilter::build(
if (unlikely(dag_query == nullptr))
return EMPTY_FILTER;

const auto & columns_to_read_info = dag_query->source_columns;
// build rough set operator
const auto rs_operator = RSOperator::build(
dag_query,
columns_to_read,
columns_to_read_info,
table_column_defines,
context.getSettingsRef().dt_enable_rough_set_filter,
tracing_logger);
// build push down filter
const auto & columns_to_read_info = dag_query->source_columns;
const auto & pushed_down_filters = dag_query->pushed_down_filters;
if (unlikely(context.getSettingsRef().force_push_down_all_filters_to_scan) && !dag_query->filters.empty())
{
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ RSOperatorPtr createUnsupported(const String & reason)

RSOperatorPtr RSOperator::build(
const std::unique_ptr<DAGQueryInfo> & dag_query,
const ColumnDefines & columns_to_read,
const ColumnInfos & scan_column_infos,
const ColumnDefines & table_column_defines,
bool enable_rs_filter,
const LoggerPtr & tracing_logger)
Expand All @@ -75,8 +75,11 @@ RSOperatorPtr RSOperator::build(
// Maybe throw an exception? Or check if `type` is nullptr before creating filter?
return Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}};
};
DM::RSOperatorPtr rs_operator
= FilterParser::parseDAGQuery(*dag_query, columns_to_read, std::move(create_attr_by_column_id), tracing_logger);
auto rs_operator = FilterParser::parseDAGQuery(
*dag_query,
scan_column_infos,
std::move(create_attr_by_column_id),
tracing_logger);
if (likely(rs_operator != DM::EMPTY_RS_OPERATOR))
LOG_DEBUG(tracing_logger, "Rough set filter: {}", rs_operator->toDebugString());

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Filter/RSOperator.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class RSOperator

static RSOperatorPtr build(
const std::unique_ptr<DAGQueryInfo> & dag_query,
const ColumnDefines & columns_to_read,
const ColumnInfos & scan_column_infos,
const ColumnDefines & table_column_defines,
bool enable_rs_filter,
const LoggerPtr & tracing_logger);
Expand Down
60 changes: 40 additions & 20 deletions dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,19 @@ inline bool isRoughSetFilterSupportType(const Int32 field_type)
return false;
}

ColumnDefine getColumnDefineForColumnExpr(const tipb::Expr & expr, const ColumnDefines & columns_to_read)
ColumnID getColumnIDForColumnExpr(const tipb::Expr & expr, const ColumnInfos & scan_column_infos)
{
assert(isColumnExpr(expr));
auto column_index = decodeDAGInt64(expr.val());
if (column_index < 0 || column_index >= static_cast<Int64>(columns_to_read.size()))
if (column_index < 0 || column_index >= static_cast<Int64>(scan_column_infos.size()))
{
throw TiFlashException(
Errors::Coprocessor::BadRequest,
"Column index out of bound: {}, should in [0,{})",
column_index,
columns_to_read.size());
scan_column_infos.size());
}
return columns_to_read[column_index];
return scan_column_infos[column_index].id;
}

// convert literal value from timezone specified in cop request to UTC in-place
Expand All @@ -110,7 +110,7 @@ inline void convertFieldWithTimezone(Field & value, const TimezoneInfo & timezon
inline RSOperatorPtr parseTiCompareExpr( //
const tipb::Expr & expr,
const FilterParser::RSFilterType filter_type,
const ColumnDefines & columns_to_read,
const ColumnInfos & scan_column_infos,
const FilterParser::AttrCreatorByColumnID & creator,
const TimezoneInfo & timezone_info)
{
Expand Down Expand Up @@ -163,8 +163,8 @@ inline RSOperatorPtr parseTiCompareExpr( //
tipb::ScalarFuncSig_Name(expr.sig()),
field_type));

const auto col = getColumnDefineForColumnExpr(child, columns_to_read);
attr = creator(col.id);
auto col_id = getColumnIDForColumnExpr(child, scan_column_infos);
attr = creator(col_id);
}
else if (isLiteralExpr(child))
{
Expand Down Expand Up @@ -243,7 +243,7 @@ inline RSOperatorPtr parseTiCompareExpr( //

RSOperatorPtr parseTiExpr(
const tipb::Expr & expr,
const ColumnDefines & columns_to_read,
const ColumnInfos & scan_column_infos,
const FilterParser::AttrCreatorByColumnID & creator,
const TimezoneInfo & timezone_info,
const LoggerPtr & log)
Expand Down Expand Up @@ -273,7 +273,7 @@ RSOperatorPtr parseTiExpr(
fmt::format("logical not with {} children is not supported", expr.children_size()));

if (const auto & child = expr.children(0); likely(isFunctionExpr(child)))
return createNot(parseTiExpr(child, columns_to_read, creator, timezone_info, log));
return createNot(parseTiExpr(child, scan_column_infos, creator, timezone_info, log));
else
return createUnsupported(fmt::format(
"child of logical not is not function, child_type={}",
Expand All @@ -288,7 +288,7 @@ RSOperatorPtr parseTiExpr(
for (const auto & child : expr.children())
{
if (likely(isFunctionExpr(child)))
children.emplace_back(parseTiExpr(child, columns_to_read, creator, timezone_info, log));
children.emplace_back(parseTiExpr(child, scan_column_infos, creator, timezone_info, log));
else
children.emplace_back(createUnsupported(fmt::format(
"child of logical operator is not function, child_type={}",
Expand All @@ -307,7 +307,7 @@ RSOperatorPtr parseTiExpr(
case FilterParser::RSFilterType::Less:
case FilterParser::RSFilterType::LessEqual:
case FilterParser::RSFilterType::In:
return parseTiCompareExpr(expr, filter_type, columns_to_read, creator, timezone_info);
return parseTiCompareExpr(expr, filter_type, scan_column_infos, creator, timezone_info);

case FilterParser::RSFilterType::IsNull:
{
Expand All @@ -330,8 +330,8 @@ RSOperatorPtr parseTiExpr(
auto field_type = child.field_type().tp();
if (isRoughSetFilterSupportType(field_type))
{
const auto col = getColumnDefineForColumnExpr(child, columns_to_read);
Attr attr = creator(col.id);
auto col_id = getColumnIDForColumnExpr(child, scan_column_infos);
auto attr = creator(col_id);
return createIsNull(attr);
}
return createUnsupported(
Expand All @@ -358,7 +358,7 @@ RSOperatorPtr parseTiExpr(

RSOperatorPtr FilterParser::parseDAGQuery(
const DAGQueryInfo & dag_info,
const ColumnDefines & columns_to_read,
const ColumnInfos & scan_column_infos,
FilterParser::AttrCreatorByColumnID && creator,
const LoggerPtr & log)
{
Expand All @@ -367,11 +367,11 @@ RSOperatorPtr FilterParser::parseDAGQuery(
children.reserve(dag_info.filters.size() + dag_info.pushed_down_filters.size());
for (const auto & filter : dag_info.filters)
{
children.emplace_back(cop::parseTiExpr(filter, columns_to_read, creator, dag_info.timezone_info, log));
children.emplace_back(cop::parseTiExpr(filter, scan_column_infos, creator, dag_info.timezone_info, log));
}
for (const auto & filter : dag_info.pushed_down_filters)
{
children.emplace_back(cop::parseTiExpr(filter, columns_to_read, creator, dag_info.timezone_info, log));
children.emplace_back(cop::parseTiExpr(filter, scan_column_infos, creator, dag_info.timezone_info, log));
}

if (children.empty())
Expand All @@ -385,19 +385,18 @@ RSOperatorPtr FilterParser::parseDAGQuery(
RSOperatorPtr FilterParser::parseRFInExpr(
const tipb::RuntimeFilterType rf_type,
const tipb::Expr & target_expr,
const ColumnDefines & columns_to_read,
const std::optional<Attr> & target_attr,
const std::set<Field> & setElements,
const TimezoneInfo & timezone_info)
{
switch (rf_type)
{
case tipb::IN:
{
if (!isColumnExpr(target_expr))
if (!isColumnExpr(target_expr) || !target_attr)
return createUnsupported(
fmt::format("rf target expr is not column expr, expr.tp={}", tipb::ExprType_Name(target_expr.tp())));
auto column_define = cop::getColumnDefineForColumnExpr(target_expr, columns_to_read);
auto attr = Attr{.col_name = column_define.name, .col_id = column_define.id, .type = column_define.type};
const auto & attr = *target_attr;
if (target_expr.field_type().tp() == TiDB::TypeTimestamp && !timezone_info.is_utc_timezone)
{
Fields values;
Expand All @@ -421,6 +420,27 @@ RSOperatorPtr FilterParser::parseRFInExpr(
}
}

std::optional<Attr> FilterParser::createAttr(
const tipb::Expr & expr,
const ColumnInfos & scan_column_infos,
const ColumnDefines & table_column_defines)
{
if (!isColumnExpr(expr))
{
return std::nullopt;
}
auto col_id = cop::getColumnIDForColumnExpr(expr, scan_column_infos);
auto it = std::find_if( //
table_column_defines.cbegin(),
table_column_defines.cend(),
[col_id](const ColumnDefine & cd) { return cd.id == col_id; });
if (it != table_column_defines.cend())
{
return Attr{.col_name = it->name, .col_id = it->id, .type = it->type};
}
return std::nullopt;
}

bool FilterParser::isRSFilterSupportType(const Int32 field_type)
{
return cop::isRoughSetFilterSupportType(field_type);
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,23 @@ class FilterParser
using AttrCreatorByColumnID = std::function<Attr(const DB::ColumnID)>;
static RSOperatorPtr parseDAGQuery(
const DAGQueryInfo & dag_info,
const ColumnDefines & columns_to_read,
const ColumnInfos & scan_column_infos,
AttrCreatorByColumnID && creator,
const LoggerPtr & log);

// only for runtime filter in predicate
static RSOperatorPtr parseRFInExpr(
tipb::RuntimeFilterType rf_type,
const tipb::Expr & target_expr,
const ColumnDefines & columns_to_read,
const std::optional<Attr> & target_attr,
const std::set<Field> & setElements,
const TimezoneInfo & timezone_info);

static std::optional<Attr> createAttr(
const tipb::Expr & expr,
const ColumnInfos & scan_column_infos,
const ColumnDefines & table_column_defines);

static bool isRSFilterSupportType(Int32 field_type);

/// Some helper structure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void UnorderedInputStream::pushDownReadyRFList(std::vector<RuntimeFilterPtr> rea
{
for (const RuntimeFilterPtr & rf : readyRFList)
{
auto rs_operator = rf->parseToRSOperator(task_pool->getColumnToRead());
auto rs_operator = rf->parseToRSOperator();
task_pool->appendRSOperator(rs_operator);
}
}
Expand Down
Loading

0 comments on commit 3951444

Please sign in to comment.