Skip to content

Commit

Permalink
Storages: Fix obtaining incorrect column information when there are v…
Browse files Browse the repository at this point in the history
…irtual columns in the query (release-7.1) (#9267)

close #9188

Use the original columns  in `query_info.dag_query` instead of `columns_to_read` when building `RSOperator`.

Signed-off-by: Lloyd-Pottiger <[email protected]>

Co-authored-by: Lloyd-Pottiger <[email protected]>
Co-authored-by: Lloyd-Pottiger <[email protected]>
  • Loading branch information
3 people authored Aug 9, 2024
1 parent 2b7dedc commit 0c42e75
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 40 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,11 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int6
{
auto analyzer = std::make_unique<DAGExpressionAnalyzer>(names_and_types_map_for_delta_merge[table_id], context);
const google::protobuf::RepeatedPtrField<tipb::Expr> pushed_down_filters{};
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions->conditions,
pushed_down_filters, // Not care now
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
scan_column_infos,
context.getTimezoneInfo());
auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(filter_conditions->conditions, *analyzer);
BlockInputStreams ins = storage->read(column_names, query_info, context, stage, 8192, 1); // TODO: Support config max_block_size and num_streams
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Encryption/tests/gtest_rate_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ TEST(WriteLimiterTest, Rate)
thread.join();
auto elapsed = watch.elapsedSeconds();
auto actual_rate = write_limiter->getTotalBytesThrough() / elapsed;
// make sure that 0.8 * target <= actual_rate <= 1.25 * target
// hint: the range [0.8, 1.25] is copied from rocksdb,
// if tests fail, try to enlarge this range.
EXPECT_GE(actual_rate / target, 0.80)
<< fmt::format("actual_rate={} target={} elapsed={:.3f}s", actual_rate, target, elapsed);
// For environments with high loads, latency can be very large.
// In theory, the upper bound of `elapsed` cannot be guaranteed.
// So that we cannot guarantee the lower bound of `actual_rate`.
// EXPECT_GE(actual_rate / target, 0.75)
// << fmt::format("actual_rate={} target={} elapsed={:.3f}s", actual_rate, target, elapsed);
EXPECT_LE(actual_rate / target, 1.30)
<< fmt::format("actual_rate={} target={} elapsed={:.3f}s", actual_rate, target, elapsed);
}
Expand Down
36 changes: 18 additions & 18 deletions dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,17 @@ inline bool isRoughSetFilterSupportType(const Int32 field_type)
return false;
}

ColumnID getColumnIDForColumnExpr(const tipb::Expr & expr, const ColumnDefines & columns_to_read)
ColumnID getColumnIDForColumnExpr(const tipb::Expr & expr, const ColumnInfos & scan_column_infos)
{
assert(isColumnExpr(expr));
auto column_index = decodeDAGInt64(expr.val());
if (column_index < 0 || column_index >= static_cast<Int64>(columns_to_read.size()))
if (column_index < 0 || column_index >= static_cast<Int64>(scan_column_infos.size()))
{
throw TiFlashException("Column index out of bound: " + DB::toString(column_index) + ", should in [0,"
+ DB::toString(columns_to_read.size()) + ")",
+ DB::toString(scan_column_infos.size()) + ")",
Errors::Coprocessor::BadRequest);
}
return columns_to_read[column_index].id;
return scan_column_infos[column_index].id;
}

enum class OperandType
Expand All @@ -103,7 +103,7 @@ enum class OperandType
inline RSOperatorPtr parseTiCompareExpr( //
const tipb::Expr & expr,
const FilterParser::RSFilterType filter_type,
const ColumnDefines & columns_to_read,
const ColumnInfos & scan_column_infos,
const FilterParser::AttrCreatorByColumnID & creator,
const TimezoneInfo & timezone_info,
const LoggerPtr & /*log*/)
Expand Down Expand Up @@ -141,7 +141,7 @@ inline RSOperatorPtr parseTiCompareExpr( //
"ColumnRef with field type(" + DB::toString(field_type) + ") is not supported",
false);

ColumnID id = getColumnIDForColumnExpr(child, columns_to_read);
ColumnID id = getColumnIDForColumnExpr(child, scan_column_infos);
attr = creator(id);
if (child_idx == 0)
left = OperandType::Column;
Expand Down Expand Up @@ -244,7 +244,7 @@ inline RSOperatorPtr parseTiCompareExpr( //
}

RSOperatorPtr parseTiExpr(const tipb::Expr & expr,
const ColumnDefines & columns_to_read,
const ColumnInfos & scan_column_infos,
const FilterParser::AttrCreatorByColumnID & creator,
const TimezoneInfo & timezone_info,
const LoggerPtr & log)
Expand Down Expand Up @@ -278,7 +278,7 @@ RSOperatorPtr parseTiExpr(const tipb::Expr & expr,
{
const auto & child = expr.children(0);
if (likely(isFunctionExpr(child)))
op = createNot(parseTiExpr(child, columns_to_read, creator, timezone_info, log));
op = createNot(parseTiExpr(child, scan_column_infos, creator, timezone_info, log));
else
op = createUnsupported(child.ShortDebugString(), "child of logical not is not function", false);
}
Expand All @@ -293,7 +293,7 @@ RSOperatorPtr parseTiExpr(const tipb::Expr & expr,
{
const auto & child = expr.children(i);
if (likely(isFunctionExpr(child)))
children.emplace_back(parseTiExpr(child, columns_to_read, creator, timezone_info, log));
children.emplace_back(parseTiExpr(child, scan_column_infos, creator, timezone_info, log));
else
children.emplace_back(createUnsupported(child.ShortDebugString(), "child of logical operator is not function", false));
}
Expand All @@ -310,7 +310,7 @@ RSOperatorPtr parseTiExpr(const tipb::Expr & expr,
case FilterParser::RSFilterType::GreaterEqual:
case FilterParser::RSFilterType::Less:
case FilterParser::RSFilterType::LessEqual:
op = parseTiCompareExpr(expr, filter_type, columns_to_read, creator, timezone_info, log);
op = parseTiCompareExpr(expr, filter_type, scan_column_infos, creator, timezone_info, log);
break;

case FilterParser::RSFilterType::IsNull:
Expand Down Expand Up @@ -342,7 +342,7 @@ RSOperatorPtr parseTiExpr(const tipb::Expr & expr,
false);
else
{
ColumnID id = getColumnIDForColumnExpr(child, columns_to_read);
ColumnID id = getColumnIDForColumnExpr(child, scan_column_infos);
Attr attr = creator(id);
op = createIsNull(attr);
}
Expand Down Expand Up @@ -373,13 +373,13 @@ RSOperatorPtr parseTiExpr(const tipb::Expr & expr,
}

inline RSOperatorPtr tryParse(const tipb::Expr & filter,
const ColumnDefines & columns_to_read,
const ColumnInfos & scan_column_infos,
const FilterParser::AttrCreatorByColumnID & creator,
const TimezoneInfo & timezone_info,
const LoggerPtr & log)
{
if (isFunctionExpr(filter))
return cop::parseTiExpr(filter, columns_to_read, creator, timezone_info, log);
return cop::parseTiExpr(filter, scan_column_infos, creator, timezone_info, log);
else
return createUnsupported(filter.ShortDebugString(), "child of logical and is not function", false);
}
Expand All @@ -388,7 +388,7 @@ inline RSOperatorPtr tryParse(const tipb::Expr & filter,


RSOperatorPtr FilterParser::parseDAGQuery(const DAGQueryInfo & dag_info,
const ColumnDefines & columns_to_read,
const ColumnInfos & scan_column_infos,
FilterParser::AttrCreatorByColumnID && creator,
const LoggerPtr & log)
{
Expand All @@ -398,11 +398,11 @@ RSOperatorPtr FilterParser::parseDAGQuery(const DAGQueryInfo & dag_info,

if (dag_info.filters.size() == 1 && dag_info.pushed_down_filters.empty())
{
op = cop::tryParse(dag_info.filters[0], columns_to_read, creator, dag_info.timezone_info, log);
op = cop::tryParse(dag_info.filters[0], scan_column_infos, creator, dag_info.timezone_info, log);
}
else if (dag_info.pushed_down_filters.size() == 1 && dag_info.filters.empty())
{
op = cop::tryParse(dag_info.pushed_down_filters[0], columns_to_read, creator, dag_info.timezone_info, log);
op = cop::tryParse(dag_info.pushed_down_filters[0], scan_column_infos, creator, dag_info.timezone_info, log);
}
else
{
Expand All @@ -411,11 +411,11 @@ RSOperatorPtr FilterParser::parseDAGQuery(const DAGQueryInfo & dag_info,
children.reserve(dag_info.filters.size() + dag_info.pushed_down_filters.size());
for (const auto & filter : dag_info.filters)
{
children.emplace_back(cop::tryParse(filter, columns_to_read, creator, dag_info.timezone_info, log));
children.emplace_back(cop::tryParse(filter, scan_column_infos, creator, dag_info.timezone_info, log));
}
for (const auto & filter : dag_info.pushed_down_filters)
{
children.emplace_back(cop::tryParse(filter, columns_to_read, creator, dag_info.timezone_info, log));
children.emplace_back(cop::tryParse(filter, scan_column_infos, creator, dag_info.timezone_info, log));
}
op = createAnd(children);
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class FilterParser
using AttrCreatorByColumnID = std::function<Attr(const DB::ColumnID)>;
static RSOperatorPtr parseDAGQuery(
const DAGQueryInfo & dag_info,
const ColumnDefines & columns_to_read,
const ColumnInfos & scan_column_infos,
AttrCreatorByColumnID && creator,
const LoggerPtr & log);

Expand Down
14 changes: 7 additions & 7 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,6 @@ DM::RowKeyRanges StorageDeltaMerge::parseMvccQueryInfo(
}

DM::RSOperatorPtr StorageDeltaMerge::buildRSOperator(const SelectQueryInfo & query_info,
const ColumnDefines & columns_to_read,
const Context & context,
const LoggerPtr & tracing_logger)
{
Expand All @@ -729,7 +728,7 @@ DM::RSOperatorPtr StorageDeltaMerge::buildRSOperator(const SelectQueryInfo & que
// Maybe throw an exception? Or check if `type` is nullptr before creating filter?
return Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}};
};
rs_operator = FilterParser::parseDAGQuery(*query_info.dag_query, columns_to_read, std::move(create_attr_by_column_id), log);
rs_operator = FilterParser::parseDAGQuery(*query_info.dag_query, query_info.dag_query->source_columns, std::move(create_attr_by_column_id), log);
}
if (likely(rs_operator != DM::EMPTY_RS_OPERATOR))
LOG_DEBUG(tracing_logger, "Rough set filter: {}", rs_operator->toDebugString());
Expand Down Expand Up @@ -821,11 +820,13 @@ DM::PushDownFilterPtr StorageDeltaMerge::buildPushDownFilter(const RSOperatorPtr
if (auto [has_cast, casted_columns] = analyzer->buildExtraCastsAfterTS(actions, need_cast_column, table_scan_column_info); has_cast)
{
NamesWithAliases project_cols;
for (size_t i = 0; i < columns_to_read.size(); ++i)
for (size_t i = 0; i < table_scan_column_info.size(); ++i)
{
if (filter_col_id_set.contains(columns_to_read[i].id))
if (filter_col_id_set.contains(table_scan_column_info[i].id))
{
project_cols.emplace_back(casted_columns[i], columns_to_read[i].name);
auto it = columns_to_read_map.find(table_scan_column_info[i].id);
RUNTIME_CHECK(it != columns_to_read_map.end(), table_scan_column_info[i].id);
project_cols.emplace_back(casted_columns[i], it->second.name);
}
}
actions->add(ExpressionAction::project(project_cols));
Expand Down Expand Up @@ -871,8 +872,7 @@ DM::PushDownFilterPtr StorageDeltaMerge::parsePushDownFilter(const SelectQueryIn
const LoggerPtr & tracing_logger)
{
// build rough set operator
DM::RSOperatorPtr rs_operator = buildRSOperator(query_info, columns_to_read, context, tracing_logger);

const DM::RSOperatorPtr rs_operator = buildRSOperator(query_info, context, tracing_logger);
// build push down filter
const auto & pushed_down_filters = query_info.dag_query != nullptr ? query_info.dag_query->pushed_down_filters : google::protobuf::RepeatedPtrField<tipb::Expr>{};
const auto & columns_to_read_info = query_info.dag_query != nullptr ? query_info.dag_query->source_columns : ColumnInfos{};
Expand Down
9 changes: 5 additions & 4 deletions dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace DB
{
struct CheckpointInfo;
using CheckpointInfoPtr = std::shared_ptr<CheckpointInfo>;

namespace DM
{
struct RowKeyRange;
Expand Down Expand Up @@ -234,10 +235,10 @@ class StorageDeltaMerge
bool dataDirExist();
void shutdownImpl();

DM::RSOperatorPtr buildRSOperator(const SelectQueryInfo & query_info,
const DM::ColumnDefines & columns_to_read,
const Context & context,
const LoggerPtr & tracing_logger);
DM::RSOperatorPtr buildRSOperator(
const SelectQueryInfo & query_info,
const Context & context,
const LoggerPtr & tracing_logger);
/// Get filters from query to construct rough set operation and push down filters.
DM::PushDownFilterPtr parsePushDownFilter(const SelectQueryInfo & query_info,
const DM::ColumnDefines & columns_to_read,
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Storages/StorageDisaggregatedRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,11 @@ DM::RSOperatorPtr StorageDisaggregated::buildRSOperator(
return DM::Attr{.col_name = iter->name, .col_id = iter->id, .type = iter->type};
return DM::Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}};
};
auto rs_operator = DM::FilterParser::parseDAGQuery(*dag_query, *columns_to_read, std::move(create_attr_by_column_id), log);
auto rs_operator = DM::FilterParser::parseDAGQuery(
*dag_query,
table_scan.getColumns(),
std::move(create_attr_by_column_id),
log);
if (likely(rs_operator != DM::EMPTY_RS_OPERATOR))
LOG_DEBUG(log, "Rough set filter: {}", rs_operator->toDebugString());
return rs_operator;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/tests/gtest_filter_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ DM::RSOperatorPtr FilterParserTest::generateRsOperator(const String table_info_j
return DM::Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}};
};

return DM::FilterParser::parseDAGQuery(*dag_query, columns_to_read, std::move(create_attr_by_column_id), log);
return DM::FilterParser::parseDAGQuery(*dag_query, table_info.columns, std::move(create_attr_by_column_id), log);
}

// Test cases for col and literal
Expand Down
11 changes: 9 additions & 2 deletions dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,15 @@ DM::PushDownFilterPtr ParsePushDownFilterTest::generatePushDownFilter(const Stri
return DM::Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}};
};

auto rs_operator = DM::FilterParser::parseDAGQuery(*dag_query, columns_to_read, std::move(create_attr_by_column_id), log);
auto push_down_filter = StorageDeltaMerge::buildPushDownFilter(rs_operator, table_info.columns, pushed_down_filters, columns_to_read, *ctx, log);
auto rs_operator
= DM::FilterParser::parseDAGQuery(*dag_query, table_info.columns, std::move(create_attr_by_column_id), log);
auto push_down_filter = StorageDeltaMerge::buildPushDownFilter(
rs_operator,
table_info.columns,
pushed_down_filters,
columns_to_read,
*ctx,
log);
return push_down_filter;
}

Expand Down
98 changes: 98 additions & 0 deletions tests/fullstack-test/expr/generated_columns.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Copyright 2024 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

mysql> drop table if exists test.t;
mysql> create table if not exists test.t(a int);
mysql> alter table test.t add column b int as (a+1) virtual;
mysql> alter table test.t add column c int;
mysql> alter table test.t add column d int as (c+1) virtual;
mysql> alter table test.t add column e int;

mysql> insert into test.t(a, c, e) values(1, 10, 100), (2, 20, 200), (3, 30, 300), (4, 40, 400), (5, 50, 500), (6, 60, 600), (7, 70, 700), (8, 80, 800), (9, 90, 900);

mysql> alter table test.t set tiflash replica 1;

func> wait_table test t

mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 10;
+------+------+------+------+------+
| a | b | c | d | e |
+------+------+------+------+------+
| 1 | 2 | 10 | 11 | 100 |
+------+------+------+------+------+

mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 20;
+------+------+------+------+------+
| a | b | c | d | e |
+------+------+------+------+------+
| 2 | 3 | 20 | 21 | 200 |
+------+------+------+------+------+

mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 30;
+------+------+------+------+------+
| a | b | c | d | e |
+------+------+------+------+------+
| 3 | 4 | 30 | 31 | 300 |
+------+------+------+------+------+

mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 40;
+------+------+------+------+------+
| a | b | c | d | e |
+------+------+------+------+------+
| 4 | 5 | 40 | 41 | 400 |
+------+------+------+------+------+

mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 50;
+------+------+------+------+------+
| a | b | c | d | e |
+------+------+------+------+------+
| 5 | 6 | 50 | 51 | 500 |
+------+------+------+------+------+

mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 60;
+------+------+------+------+------+
| a | b | c | d | e |
+------+------+------+------+------+
| 6 | 7 | 60 | 61 | 600 |
+------+------+------+------+------+

mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 70;
+------+------+------+------+------+
| a | b | c | d | e |
+------+------+------+------+------+
| 7 | 8 | 70 | 71 | 700 |
+------+------+------+------+------+

mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 80;
+------+------+------+------+------+
| a | b | c | d | e |
+------+------+------+------+------+
| 8 | 9 | 80 | 81 | 800 |
+------+------+------+------+------+

mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 80;
+------+------+------+------+------+
| a | b | c | d | e |
+------+------+------+------+------+
| 8 | 9 | 80 | 81 | 800 |
+------+------+------+------+------+

mysql> set tidb_isolation_read_engines='tiflash'; select a, b, c, d, e from test.t where c = 90;
+------+------+------+------+------+
| a | b | c | d | e |
+------+------+------+------+------+
| 9 | 10 | 90 | 91 | 900 |
+------+------+------+------+------+

mysql> drop table test.t;
Loading

0 comments on commit 0c42e75

Please sign in to comment.