Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: Fix obtaining incorrect column information when there are virtual columns in the query (#9189) #9206

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions dbms/src/DataStreams/RuntimeFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,14 @@ bool RuntimeFilter::updateStatus(RuntimeFilterStatus status_, const std::string
return true;
}

DM::RSOperatorPtr RuntimeFilter::parseToRSOperator(DM::ColumnDefines & columns_to_read)
void RuntimeFilter::setTargetAttr(
const DM::ColumnInfos & scan_column_infos,
const DM::ColumnDefines & table_column_defines)
{
target_attr = DM::FilterParser::createAttr(target_expr, scan_column_infos, table_column_defines);
}

DM::RSOperatorPtr RuntimeFilter::parseToRSOperator()
{
switch (rf_type)
{
Expand All @@ -216,7 +223,7 @@ DM::RSOperatorPtr RuntimeFilter::parseToRSOperator(DM::ColumnDefines & columns_t
return DM::FilterParser::parseRFInExpr(
rf_type,
target_expr,
columns_to_read,
target_attr,
in_values_set->getUniqueSetElements(),
timezone_info);
case tipb::MIN_MAX:
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/DataStreams/RuntimeFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <Columns/IColumn.h>
#include <Interpreters/Set.h>
#include <Storages/DeltaMerge/ColumnDefine_fwd.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <tipb/executor.pb.h>
Expand Down Expand Up @@ -47,7 +48,7 @@ class RuntimeFilter
}
source_expr = rf_pb.source_expr_list().Get(0);
target_expr = rf_pb.target_expr_list().Get(0);
};
}

std::string getSourceColumnName() const;

Expand Down Expand Up @@ -77,7 +78,8 @@ class RuntimeFilter

bool await(int64_t ms_remaining);

DM::RSOperatorPtr parseToRSOperator(DM::ColumnDefines & columns_to_read);
void setTargetAttr(const DM::ColumnInfos & scan_column_infos, const DM::ColumnDefines & table_column_defines);
DM::RSOperatorPtr parseToRSOperator();

const int id;

Expand All @@ -86,6 +88,7 @@ class RuntimeFilter

tipb::Expr source_expr;
tipb::Expr target_expr;
std::optional<DM::Attr> target_attr;
const tipb::RuntimeFilterType rf_type;
TimezoneInfo timezone_info;
// thread safe
Expand Down
17 changes: 13 additions & 4 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,11 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(
if (filter_conditions && filter_conditions->hasValue())
{
auto analyzer = std::make_unique<DAGExpressionAnalyzer>(names_and_types_map_for_delta_merge[table_id], context);
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions->conditions,
empty_pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
scan_column_infos,
runtime_filter_ids,
rf_max_wait_time_ms,
context.getTimezoneInfo());
Expand All @@ -226,10 +227,11 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(
else
{
static const google::protobuf::RepeatedPtrField<tipb::Expr> empty_filters{};
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
empty_filters,
empty_pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
scan_column_infos,
runtime_filter_ids,
rf_max_wait_time_ms,
context.getTimezoneInfo());
Expand All @@ -256,10 +258,11 @@ void MockStorage::buildExecFromDeltaMerge(
if (filter_conditions && filter_conditions->hasValue())
{
auto analyzer = std::make_unique<DAGExpressionAnalyzer>(names_and_types_map_for_delta_merge[table_id], context);
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions->conditions,
empty_pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
scan_column_infos,
runtime_filter_ids,
rf_max_wait_time_ms,
context.getTimezoneInfo());
Expand Down Expand Up @@ -288,10 +291,11 @@ void MockStorage::buildExecFromDeltaMerge(
else
{
static const google::protobuf::RepeatedPtrField<tipb::Expr> empty_filters{};
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
empty_filters,
empty_pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
scan_column_infos,
runtime_filter_ids,
rf_max_wait_time_ms,
context.getTimezoneInfo());
Expand Down Expand Up @@ -575,6 +579,11 @@ TableInfo MockStorage::getTableInfoForDeltaMerge(const String & name)
return table_infos_for_delta_merge[name];
}

DM::ColumnDefines MockStorage::getStoreColumnDefines(Int64 table_id)
{
return storage_delta_merge_map[table_id]->getStoreColumnDefines();
}

ColumnInfos mockColumnInfosToTiDBColumnInfos(const MockColumnInfoVec & mock_column_infos)
{
ColumnID col_id = 0;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Debug/MockStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Operators/Operator.h>
#include <Storages/DeltaMerge/ColumnDefine_fwd.h>
#include <TiDB/Schema/TiDB.h>
#include <common/types.h>

Expand Down Expand Up @@ -145,6 +146,7 @@ class MockStorage

TableInfo getTableInfo(const String & name);
TableInfo getTableInfoForDeltaMerge(const String & name);
DM::ColumnDefines getStoreColumnDefines(Int64 table_id);

size_t getTableScanConcurrencyHint(const TiDBTableScan & table_scan);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class RFWaitTask : public Task
{
for (const RuntimeFilterPtr & rf : ready_rf_list)
{
auto rs_operator = rf->parseToRSOperator(task_pool->getColumnToRead());
auto rs_operator = rf->parseToRSOperator();
task_pool->appendRSOperator(rs_operator);
}
DM::SegmentReadTaskScheduler::instance().add(task_pool);
Expand Down
19 changes: 15 additions & 4 deletions dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ void PhysicalMockTableScan::buildPipelineExecGroupImpl(
{
if (auto * source_op = dynamic_cast<UnorderedSourceOp *>(group_builder.getCurBuilder(i).source_op.get()))
{
auto runtime_filter_list
= context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids);
auto runtime_filter_list = getRuntimeFilterList(context);
// todo config max wait time
source_op->setRuntimeFilterInfo(runtime_filter_list, rf_max_wait_time_ms);
}
Expand Down Expand Up @@ -232,11 +231,23 @@ void PhysicalMockTableScan::buildRuntimeFilterInLocalStream(Context & context)
{
if (auto * p_stream = dynamic_cast<DM::UnorderedInputStream *>(local_stream.get()))
{
auto runtime_filter_list
= context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids);
auto runtime_filter_list = getRuntimeFilterList(context);
// todo config max wait time
p_stream->setRuntimeFilterInfo(runtime_filter_list, rf_max_wait_time_ms);
}
}
}

RuntimeFilteList PhysicalMockTableScan::getRuntimeFilterList(Context & context)
{
auto mock_column_infos = context.mockStorage()->getTableSchemaForDeltaMerge(table_id);
auto column_infos = mockColumnInfosToTiDBColumnInfos(mock_column_infos);
auto column_defines = context.mockStorage()->getStoreColumnDefines(table_id);
auto rfs = context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids);
for (auto & rf : rfs)
{
rf->setTargetAttr(column_infos, column_defines);
}
return rfs;
}
} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Flash/Coprocessor/RuntimeFilterMgr.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Flash/Planner/Plans/PhysicalLeaf.h>
#include <tipb/executor.pb.h>
Expand Down Expand Up @@ -70,6 +71,8 @@ class PhysicalMockTableScan : public PhysicalLeaf

void buildRuntimeFilterInLocalStream(Context & context);

RuntimeFilteList getRuntimeFilterList(Context & context);

private:
FilterConditions filter_conditions;
Block sample_block;
Expand Down
36 changes: 36 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnDefine_fwd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Storages/KVStore/Types.h>

#include <memory>
#include <unordered_map>
#include <vector>

namespace TiDB
{
struct ColumnInfo;
}

namespace DB::DM
{
struct ColumnDefine;
using ColumnDefines = std::vector<ColumnDefine>;
using ColumnDefinesPtr = std::shared_ptr<ColumnDefines>;
using ColumnDefineMap = std::unordered_map<DB::ColumnID, ColumnDefine>;

using ColumnInfos = std::vector<TiDB::ColumnInfo>;
} // namespace DB::DM
Loading