From 41cae7ed208b4fa3099b4bbdbe8e69d67c87769b Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Tue, 19 Nov 2019 12:12:20 +0800 Subject: [PATCH 01/13] simple parser --- dbms/CMakeLists.txt | 1 + .../Storages/DeltaMerge/DeltaMergeStore.cpp | 3 +- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 1 + .../Storages/DeltaMerge/Filter/RSOperator.cpp | 2 +- .../Storages/DeltaMerge/Filter/RSOperator.h | 17 +- .../Storages/DeltaMerge/Filter/Unsupported.h | 8 +- .../DeltaMerge/FilterParser/FilterParser.h | 60 ++ .../FilterParser/FilterParser_ast.cpp | 136 +++ .../FilterParser/FilterParser_dag.cpp | 781 ++++++++++++++++++ dbms/src/Storages/Page/PageFile.cpp | 8 +- dbms/src/Storages/StorageDeltaMerge.cpp | 33 +- 11 files changed, 1033 insertions(+), 17 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h create mode 100644 dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp create mode 100644 dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_dag.cpp diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 286cc4cf37b..3baa1484adf 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -57,6 +57,7 @@ add_headers_and_sources(dbms src/Storages) add_headers_and_sources(dbms src/Storages/DeltaMerge) add_headers_and_sources(dbms src/Storages/DeltaMerge/Index) add_headers_and_sources(dbms src/Storages/DeltaMerge/Filter) +add_headers_and_sources(dbms src/Storages/DeltaMerge/FilterParser) add_headers_and_sources(dbms src/Storages/Distributed) add_headers_and_sources(dbms src/Storages/MergeTree) add_headers_and_sources(dbms src/Storages/Transaction) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 12ce6bd8d56..3376ec3c8cd 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -323,6 +323,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, const HandleRanges & sorted_ranges, size_t num_streams, UInt64 max_version, + const RSOperatorPtr & filter, size_t expected_block_size) { SegmentReadTasks tasks; @@ -417,7 +418,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, task.read_snapshot, *storage_snapshot, task.ranges, - {}, + filter, max_version, std::max(expected_block_size, STABLE_CHUNK_ROWS)); }; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 043fde899c6..57a7b5d0019 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -189,6 +189,7 @@ class DeltaMergeStore : private boost::noncopyable const HandleRanges & sorted_ranges, size_t num_streams, UInt64 max_version, + const RSOperatorPtr & filter, size_t expected_block_size = STABLE_CHUNK_ROWS); /// Force flush all data to disk. diff --git a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp index a9d8afcec36..38eabad922e 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp +++ b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp @@ -33,7 +33,7 @@ RSOperatorPtr createNotEqual(const Attr & attr, const Field & value) RSOperatorPtr createNotIn(const Attr & attr, const Fields & values) { return std::make_shared(attr, values); } RSOperatorPtr createNotLike(const Attr & attr, const Field & value) { return std::make_shared(attr, value); } RSOperatorPtr createOr(const RSOperators & children) { return std::make_shared(children); } -RSOperatorPtr createUnsupported(const String & content, bool is_not) { return std::make_shared(content, is_not); } +RSOperatorPtr createUnsupported(const String & content, const String & reason, bool is_not) { return std::make_shared(content, reason, is_not); } // clang-format on } // namespace DM } // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h index c465835d419..e2885c0e85f 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h +++ b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h @@ -90,20 +90,25 @@ class LogicalOp : public RSOperator return Some; +// logical +RSOperatorPtr createNot(const RSOperatorPtr & op); +RSOperatorPtr createOr(const RSOperators & children); RSOperatorPtr createAnd(const RSOperators & children); +// compare RSOperatorPtr createEqual(const Attr & attr, const Field & value); +RSOperatorPtr createNotEqual(const Attr & attr, const Field & value); RSOperatorPtr createGreater(const Attr & attr, const Field & value, int null_direction); RSOperatorPtr createGreaterEqual(const Attr & attr, const Field & value, int null_direction); -RSOperatorPtr createIn(const Attr & attr, const Fields & values); RSOperatorPtr createLess(const Attr & attr, const Field & value, int null_direction); RSOperatorPtr createLessEqual(const Attr & attr, const Field & value, int null_direction); -RSOperatorPtr createLike(const Attr & attr, const Field & value); -RSOperatorPtr createNot(const RSOperatorPtr & op); -RSOperatorPtr createNotEqual(const Attr & attr, const Field & value); +// set +RSOperatorPtr createIn(const Attr & attr, const Fields & values); RSOperatorPtr createNotIn(const Attr & attr, const Fields & values); +// +RSOperatorPtr createLike(const Attr & attr, const Field & value); RSOperatorPtr createNotLike(const Attr & attr, const Field & values); -RSOperatorPtr createOr(const RSOperators & children); -RSOperatorPtr createUnsupported(const String & content, bool is_not); +// +RSOperatorPtr createUnsupported(const String & content, const String & reason, bool is_not); } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/Filter/Unsupported.h b/dbms/src/Storages/DeltaMerge/Filter/Unsupported.h index d51167e4ce9..c7fa324bb15 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/Unsupported.h +++ b/dbms/src/Storages/DeltaMerge/Filter/Unsupported.h @@ -11,23 +11,25 @@ namespace DM class Unsupported : public RSOperator { String content; + String reason; bool is_not; public: - Unsupported(const String & content_) : content(content_), is_not(false) {} - Unsupported(const String & content_, bool is_not_) : content(content_), is_not(is_not_) {} + Unsupported(const String & content_, const String & reason_) : Unsupported(content_, reason_, false) {} + Unsupported(const String & content_, const String & reason_, bool is_not_) : content(content_), reason(reason_), is_not(is_not_) {} String name() override { return "unsupported"; } String toString() override { return R"({"op":")" + name() + // + R"(","reason":")" + reason + // R"(","content":")" + content + // R"(","is_not":")" + DB::toString(is_not) + "\"}"; } RSResult roughCheck(const RSCheckParam & /*param*/) override { return Some; } - RSOperatorPtr applyNot() override { return createUnsupported(content, !is_not); }; + RSOperatorPtr applyNot() override { return createUnsupported(content, reason, !is_not); }; }; } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h new file mode 100644 index 00000000000..7a542b25246 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h @@ -0,0 +1,60 @@ +#pragma once + +#include +#include + +#include +#include + +namespace DB +{ + +class ASTSelectQuery; + +struct DAGQueryInfo; + +namespace DM +{ + +class FilterParser +{ +public: + /// From ast. + using AttrCreatorByColumnName = std::function; + static RSOperatorPtr parseSelectQuery(const ASTSelectQuery & query, Poco::Logger * log); + +public: + /// From dag. + using AttrCreatorByColumnID = std::function; + static RSOperatorPtr parseDAGQuery(const DAGQueryInfo & dag_info, AttrCreatorByColumnID && creator, Poco::Logger * log); + + /// Some helper structur + + enum RSFilterType + { + // logical + Not = 0, + Or, + And, + // compare + Equal, + NotEqual, + Greater, + GreaterEqual, + Less, + LessEuqal, + + In, + NotIn, + + Like, + NotLike, + + Unsupported = 254, + }; + + static std::unordered_map scalar_func_rs_filter_map; +}; + +} // namespace DM +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp new file mode 100644 index 00000000000..37579fdfb9e --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp @@ -0,0 +1,136 @@ +#include + +#include + +#include +#include + +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int COP_BAD_DAG_REQUEST; +} // namespace ErrorCodes + +namespace DM +{ +namespace ast +{ + +RSOperatorPtr parseRSOperator(const ASTFunction * const func, Poco::Logger * log) +{ + assert(func != nullptr); + RSOperatorPtr op = EMPTY_FILTER; + + + if (func->name == "equals") + { + auto * lhs = static_cast(func->arguments->children[0].get()); + assert(lhs != nullptr); + assert(lhs->kind == ASTIdentifier::Kind::Column); + Attr attr{lhs->name, 0, DataTypeFactory::instance().get("Int32")}; + auto * rhs = static_cast(func->arguments->children[1].get()); + assert(rhs != nullptr); + op = createEqual(attr, rhs->value); + } + else if (func->name == "or" || func->name == "and") + { + RSOperators children; + for (const auto & child : func->arguments->children) + { + ASTFunction * sub_func = static_cast(child.get()); + assert(sub_func != nullptr); + children.emplace_back(parseRSOperator(sub_func, log)); + } + op = createOr(children); + } + else if (func->name == "not") + { + assert(func->arguments->children.size() == 1); + ASTFunction * sub_func = static_cast(func->arguments->children[0].get()); + assert(sub_func != nullptr); + RSOperatorPtr sub_op = parseRSOperator(sub_func, log); + op = createNot(sub_op); + } +#if 0 + else if (func->name == "notEquals") + { + op = createEqual(); + } + else if (func->name == "greater") + { + op = createGreater(); + } + else if (func->name == "greaterOrEquals") + { + op = createGreaterEqual(); + } + else if (func->name == "less") + { + op = createLess(); + } + else if (func->name == "lessOrEquals") + { + op = createLessEqual(); + } + else if (func->name == "in") + { + + } + else if (func->name == "like") + { + + } + else if (func->name == "notLike") + { + + } +#endif + else + { + std::stringstream ss; + func->dumpTree(ss); + op = createUnsupported(ss.str(), "Function " + func->name + " is not supported", false); + } + + return op; +} + +} // namespace ast + +RSOperatorPtr FilterParser::parseSelectQuery(const ASTSelectQuery & query, Poco::Logger * log) +{ + RSOperatorPtr op = EMPTY_FILTER; + if (!query.where_expression) + return op; + + const ASTFunction * where = static_cast(query.where_expression.get()); + if (!where) + { + std::stringstream ss; + query.where_expression->dumpTree(ss); + LOG_WARNING(log, String("Where expression is not ASTFunction, can not parse to rough set index. Expr: ") + ss.str()); + return op; + } + + std::stringstream ss; + where->dumpTree(ss); + std::string expr_tree = ss.str(); + LOG_TRACE(log, " where expr: " << expr_tree); + + op = ast::parseRSOperator(where, log); + + return op; +} + +} // namespace DM + +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_dag.cpp b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_dag.cpp new file mode 100644 index 00000000000..d55e7763081 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_dag.cpp @@ -0,0 +1,781 @@ +#include + +#include + +#include + +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int COP_BAD_DAG_REQUEST; +} // namespace ErrorCodes + +namespace DM +{ + +namespace cop +{ + +ColumnID getColumnIDForColumnExpr(const tipb::Expr & expr, const std::vector & input_col_ids) +{ + assert(isColumnExpr(expr)); + auto column_index = decodeDAGInt64(expr.val()); + if (column_index < 0 || column_index >= static_cast(input_col_ids.size())) + { + throw Exception("Column index out of bound: " + DB::toString(column_index) + ", should in [0," + DB::toString(input_col_ids.size()) + + ")", + ErrorCodes::COP_BAD_DAG_REQUEST); + } + return input_col_ids[column_index]; +} + +inline RSOperatorPtr parseTiCompareExpr( // + const tipb::Expr & expr, + const FilterParser::RSFilterType filter_type, + const std::vector & input_col_ids, + const FilterParser::AttrCreatorByColumnID & creator, + Poco::Logger * /* log */) +{ + if (unlikely(expr.children_size() != 2)) + return createUnsupported(expr.DebugString(), + tipb::ScalarFuncSig_Name(expr.sig()) + " with " + DB::toString(expr.children_size()) + + " children is not supported", + false); + + /// Only support `column` `op` `constant` now. + + Attr attr; + Field value; + UInt32 state = 0x0; + constexpr UInt32 state_has_column = 0x1; + constexpr UInt32 state_has_literal = 0x2; + constexpr UInt32 state_finish = state_has_column | state_has_literal; + for (const auto & child : expr.children()) + { + if (isColumnExpr(child)) + { + state |= state_has_column; + ColumnID id = getColumnIDForColumnExpr(child, input_col_ids); + attr = creator(id); + } + else if (isLiteralExpr(child)) + { + state |= state_has_literal; + value = decodeLiteral(child); + } + } + + if (unlikely(state != state_finish)) + return createUnsupported( + expr.DebugString(), tipb::ScalarFuncSig_Name(expr.sig()) + " with state " + DB::toString(state) + " is not supported", false); + else + { + RSOperatorPtr op; + switch (filter_type) + { + case FilterParser::RSFilterType::Equal: + op = createEqual(attr, value); + break; + case FilterParser::RSFilterType::NotEqual: + op = createNotEqual(attr, value); + break; + case FilterParser::RSFilterType::Greater: + op = createGreater(attr, value, -1); + break; + case FilterParser::RSFilterType::GreaterEqual: + op = createGreaterEqual(attr, value, -1); + break; + case FilterParser::RSFilterType::Less: + op = createLess(attr, value, -1); + break; + case FilterParser::RSFilterType::LessEuqal: + op = createLessEqual(attr, value, -1); + break; + default: + op = createUnsupported(expr.DebugString(), "Unknown compare type: " + tipb::ExprType_Name(expr.tp()), false); + break; + } + return op; + } +} + +RSOperatorPtr parseTiExpr(const tipb::Expr & expr, + const std::vector & input_colids, + const FilterParser::AttrCreatorByColumnID & creator, + Poco::Logger * log) +{ + assert(isFunctionExpr(expr)); + + RSOperatorPtr op = EMPTY_FILTER; + if (unlikely(isAggFunctionExpr(expr))) + { + op = createUnsupported(expr.DebugString(), "agg function: " + tipb::ExprType_Name(expr.tp()), false); + return op; + } + + + if (auto iter = FilterParser::scalar_func_rs_filter_map.find(expr.sig()); iter != FilterParser::scalar_func_rs_filter_map.end()) + { + FilterParser::RSFilterType filter_type = iter->second; + switch (filter_type) + { + case FilterParser::RSFilterType::Not: + { + if (unlikely(expr.children_size() != 1)) + op = createUnsupported(expr.DebugString(), "logical not with " + DB::toString(expr.children_size()) + " children", false); + else + { + const auto & child = expr.children(0); + if (likely(isFunctionExpr(child))) + op = createNot(parseTiExpr(child, input_colids, creator, log)); + else + op = createUnsupported(child.DebugString(), "child of logical not is not function", false); + } + } + break; + + case FilterParser::RSFilterType::And: + case FilterParser::RSFilterType::Or: + { + RSOperators children; + for (Int32 i = 0; i < expr.children_size(); ++i) + { + const auto & child = expr.children(i); + if (likely(isFunctionExpr(child))) + children.emplace_back(parseTiExpr(child, input_colids, creator, log)); + else + children.emplace_back(createUnsupported(child.DebugString(), "child of logical operator is not function", false)); + } + if (expr.sig() == tipb::ScalarFuncSig::LogicalAnd) + op = createAnd(children); + else + op = createOr(children); + } + break; + + case FilterParser::RSFilterType::Equal: + case FilterParser::RSFilterType::NotEqual: + case FilterParser::RSFilterType::Greater: + case FilterParser::RSFilterType::GreaterEqual: + case FilterParser::RSFilterType::Less: + case FilterParser::RSFilterType::LessEuqal: + op = parseTiCompareExpr(expr, filter_type, input_colids, creator, log); + break; + + case FilterParser::RSFilterType::In: + case FilterParser::RSFilterType::NotIn: + case FilterParser::RSFilterType::Like: + case FilterParser::RSFilterType::NotLike: + case FilterParser::RSFilterType::Unsupported: + op = createUnsupported(expr.DebugString(), tipb::ScalarFuncSig_Name(expr.sig()) + " is not supported", false); + break; + } + } + else + { + op = createUnsupported(expr.DebugString(), tipb::ScalarFuncSig_Name(expr.sig()) + " is not supported", false); + } + + return op; +} + +} // namespace cop + + +RSOperatorPtr FilterParser::parseDAGQuery(const DAGQueryInfo & dag_info, FilterParser::AttrCreatorByColumnID && creator, Poco::Logger * log) +{ + RSOperatorPtr op = EMPTY_FILTER; + if (!dag_info.dag.hasSelection() || dag_info.dag.getSelection().conditions_size() == 0) + return op; + + std::vector column_ids; + for (const tipb::ColumnInfo & col : dag_info.dag.getTS().columns()) + { + ColumnID cid = col.column_id(); + if (cid == -1) + // Column ID -1 means TiDB expects no specific column, mostly it is for cases like `select count(*)`. + continue; + + column_ids.emplace_back(cid); + } + + const auto & selection = dag_info.dag.getSelection(); + if (likely(selection.conditions_size() == 1)) + op = cop::parseTiExpr(selection.conditions(0), column_ids, creator, log); + else + // This should not happen + op = createUnsupported(selection.DebugString(), "tipb::Selection with multiple conditions", false); + return op; +} + +std::unordered_map FilterParser::scalar_func_rs_filter_map{ + /* + {tipb::ScalarFuncSig::CastIntAsInt, "cast"}, + {tipb::ScalarFuncSig::CastIntAsReal, "cast"}, + {tipb::ScalarFuncSig::CastIntAsString, "cast"}, + {tipb::ScalarFuncSig::CastIntAsDecimal, "cast"}, + {tipb::ScalarFuncSig::CastIntAsTime, "cast"}, + {tipb::ScalarFuncSig::CastIntAsDuration, "cast"}, + {tipb::ScalarFuncSig::CastIntAsJson, "cast"}, + + {tipb::ScalarFuncSig::CastRealAsInt, "cast"}, + {tipb::ScalarFuncSig::CastRealAsReal, "cast"}, + {tipb::ScalarFuncSig::CastRealAsString, "cast"}, + {tipb::ScalarFuncSig::CastRealAsDecimal, "cast"}, + {tipb::ScalarFuncSig::CastRealAsTime, "cast"}, + {tipb::ScalarFuncSig::CastRealAsDuration, "cast"}, + {tipb::ScalarFuncSig::CastRealAsJson, "cast"}, + + {tipb::ScalarFuncSig::CastDecimalAsInt, "cast"}, + {tipb::ScalarFuncSig::CastDecimalAsReal, "cast"}, + {tipb::ScalarFuncSig::CastDecimalAsString, "cast"}, + {tipb::ScalarFuncSig::CastDecimalAsDecimal, "cast"}, + {tipb::ScalarFuncSig::CastDecimalAsTime, "cast"}, + {tipb::ScalarFuncSig::CastDecimalAsDuration, "cast"}, + {tipb::ScalarFuncSig::CastDecimalAsJson, "cast"}, + + {tipb::ScalarFuncSig::CastStringAsInt, "cast"}, + {tipb::ScalarFuncSig::CastStringAsReal, "cast"}, + {tipb::ScalarFuncSig::CastStringAsString, "cast"}, + {tipb::ScalarFuncSig::CastStringAsDecimal, "cast"}, + {tipb::ScalarFuncSig::CastStringAsTime, "cast"}, + {tipb::ScalarFuncSig::CastStringAsDuration, "cast"}, + {tipb::ScalarFuncSig::CastStringAsJson, "cast"}, + + {tipb::ScalarFuncSig::CastTimeAsInt, "cast"}, + {tipb::ScalarFuncSig::CastTimeAsReal, "cast"}, + {tipb::ScalarFuncSig::CastTimeAsString, "cast"}, + {tipb::ScalarFuncSig::CastTimeAsDecimal, "cast"}, + {tipb::ScalarFuncSig::CastTimeAsTime, "cast"}, + {tipb::ScalarFuncSig::CastTimeAsDuration, "cast"}, + {tipb::ScalarFuncSig::CastTimeAsJson, "cast"}, + + {tipb::ScalarFuncSig::CastDurationAsInt, "cast"}, + {tipb::ScalarFuncSig::CastDurationAsReal, "cast"}, + {tipb::ScalarFuncSig::CastDurationAsString, "cast"}, + {tipb::ScalarFuncSig::CastDurationAsDecimal, "cast"}, + {tipb::ScalarFuncSig::CastDurationAsTime, "cast"}, + {tipb::ScalarFuncSig::CastDurationAsDuration, "cast"}, + {tipb::ScalarFuncSig::CastDurationAsJson, "cast"}, + + {tipb::ScalarFuncSig::CastJsonAsInt, "cast"}, + {tipb::ScalarFuncSig::CastJsonAsReal, "cast"}, + {tipb::ScalarFuncSig::CastJsonAsString, "cast"}, + {tipb::ScalarFuncSig::CastJsonAsDecimal, "cast"}, + {tipb::ScalarFuncSig::CastJsonAsTime, "cast"}, + {tipb::ScalarFuncSig::CastJsonAsDuration, "cast"}, + {tipb::ScalarFuncSig::CastJsonAsJson, "cast"}, + + {tipb::ScalarFuncSig::CoalesceInt, "coalesce"}, + {tipb::ScalarFuncSig::CoalesceReal, "coalesce"}, + {tipb::ScalarFuncSig::CoalesceString, "coalesce"}, + {tipb::ScalarFuncSig::CoalesceDecimal, "coalesce"}, + {tipb::ScalarFuncSig::CoalesceTime, "coalesce"}, + {tipb::ScalarFuncSig::CoalesceDuration, "coalesce"}, + {tipb::ScalarFuncSig::CoalesceJson, "coalesce"}, + */ + + {tipb::ScalarFuncSig::LTInt, FilterParser::RSFilterType::Less}, + {tipb::ScalarFuncSig::LTReal, FilterParser::RSFilterType::Less}, + {tipb::ScalarFuncSig::LTString, FilterParser::RSFilterType::Less}, + {tipb::ScalarFuncSig::LTDecimal, FilterParser::RSFilterType::Less}, + {tipb::ScalarFuncSig::LTTime, FilterParser::RSFilterType::Less}, + {tipb::ScalarFuncSig::LTDuration, FilterParser::RSFilterType::Less}, + {tipb::ScalarFuncSig::LTJson, FilterParser::RSFilterType::Less}, + + {tipb::ScalarFuncSig::LEInt, FilterParser::RSFilterType::LessEuqal}, + {tipb::ScalarFuncSig::LEReal, FilterParser::RSFilterType::LessEuqal}, + {tipb::ScalarFuncSig::LEString, FilterParser::RSFilterType::LessEuqal}, + {tipb::ScalarFuncSig::LEDecimal, FilterParser::RSFilterType::LessEuqal}, + {tipb::ScalarFuncSig::LETime, FilterParser::RSFilterType::LessEuqal}, + {tipb::ScalarFuncSig::LEDuration, FilterParser::RSFilterType::LessEuqal}, + {tipb::ScalarFuncSig::LEJson, FilterParser::RSFilterType::LessEuqal}, + + {tipb::ScalarFuncSig::GTInt, FilterParser::RSFilterType::Greater}, + {tipb::ScalarFuncSig::GTReal, FilterParser::RSFilterType::Greater}, + {tipb::ScalarFuncSig::GTString, FilterParser::RSFilterType::Greater}, + {tipb::ScalarFuncSig::GTDecimal, FilterParser::RSFilterType::Greater}, + {tipb::ScalarFuncSig::GTTime, FilterParser::RSFilterType::Greater}, + {tipb::ScalarFuncSig::GTDuration, FilterParser::RSFilterType::Greater}, + {tipb::ScalarFuncSig::GTJson, FilterParser::RSFilterType::Greater}, + + // {tipb::ScalarFuncSig::GreatestInt, "greatest"}, + // {tipb::ScalarFuncSig::GreatestReal, "greatest"}, + // {tipb::ScalarFuncSig::GreatestString, "greatest"}, + // {tipb::ScalarFuncSig::GreatestDecimal, "greatest"}, + // {tipb::ScalarFuncSig::GreatestTime, "greatest"}, + + // {tipb::ScalarFuncSig::LeastInt, "least"}, + // {tipb::ScalarFuncSig::LeastReal, "least"}, + // {tipb::ScalarFuncSig::LeastString, "least"}, + // {tipb::ScalarFuncSig::LeastDecimal, "least"}, + // {tipb::ScalarFuncSig::LeastTime, "least"}, + + //{tipb::ScalarFuncSig::IntervalInt, "cast"}, + //{tipb::ScalarFuncSig::IntervalReal, "cast"}, + + {tipb::ScalarFuncSig::GEInt, FilterParser::RSFilterType::GreaterEqual}, + {tipb::ScalarFuncSig::GEReal, FilterParser::RSFilterType::GreaterEqual}, + {tipb::ScalarFuncSig::GEString, FilterParser::RSFilterType::GreaterEqual}, + {tipb::ScalarFuncSig::GEDecimal, FilterParser::RSFilterType::GreaterEqual}, + {tipb::ScalarFuncSig::GETime, FilterParser::RSFilterType::GreaterEqual}, + {tipb::ScalarFuncSig::GEDuration, FilterParser::RSFilterType::GreaterEqual}, + {tipb::ScalarFuncSig::GEJson, FilterParser::RSFilterType::GreaterEqual}, + + {tipb::ScalarFuncSig::EQInt, FilterParser::RSFilterType::Equal}, + {tipb::ScalarFuncSig::EQReal, FilterParser::RSFilterType::Equal}, + {tipb::ScalarFuncSig::EQString, FilterParser::RSFilterType::Equal}, + {tipb::ScalarFuncSig::EQDecimal, FilterParser::RSFilterType::Equal}, + {tipb::ScalarFuncSig::EQTime, FilterParser::RSFilterType::Equal}, + {tipb::ScalarFuncSig::EQDuration, FilterParser::RSFilterType::Equal}, + {tipb::ScalarFuncSig::EQJson, FilterParser::RSFilterType::Equal}, + + {tipb::ScalarFuncSig::NEInt, FilterParser::RSFilterType::NotEqual}, + {tipb::ScalarFuncSig::NEReal, FilterParser::RSFilterType::NotEqual}, + {tipb::ScalarFuncSig::NEString, FilterParser::RSFilterType::NotEqual}, + {tipb::ScalarFuncSig::NEDecimal, FilterParser::RSFilterType::NotEqual}, + {tipb::ScalarFuncSig::NETime, FilterParser::RSFilterType::NotEqual}, + {tipb::ScalarFuncSig::NEDuration, FilterParser::RSFilterType::NotEqual}, + {tipb::ScalarFuncSig::NEJson, FilterParser::RSFilterType::NotEqual}, + + //{tipb::ScalarFuncSig::NullEQInt, "cast"}, + //{tipb::ScalarFuncSig::NullEQReal, "cast"}, + //{tipb::ScalarFuncSig::NullEQString, "cast"}, + //{tipb::ScalarFuncSig::NullEQDecimal, "cast"}, + //{tipb::ScalarFuncSig::NullEQTime, "cast"}, + //{tipb::ScalarFuncSig::NullEQDuration, "cast"}, + //{tipb::ScalarFuncSig::NullEQJson, "cast"}, + + // {tipb::ScalarFuncSig::PlusReal, "plus"}, + // {tipb::ScalarFuncSig::PlusDecimal, "plus"}, + // {tipb::ScalarFuncSig::PlusInt, "plus"}, + + // {tipb::ScalarFuncSig::MinusReal, "minus"}, + // {tipb::ScalarFuncSig::MinusDecimal, "minus"}, + // {tipb::ScalarFuncSig::MinusInt, "minus"}, + + // {tipb::ScalarFuncSig::MultiplyReal, "multiply"}, + // {tipb::ScalarFuncSig::MultiplyDecimal, "multiply"}, + // {tipb::ScalarFuncSig::MultiplyInt, "multiply"}, + + // {tipb::ScalarFuncSig::DivideReal, "divide"}, + // {tipb::ScalarFuncSig::DivideDecimal, "divide"}, + // {tipb::ScalarFuncSig::IntDivideInt, "intDiv"}, + // {tipb::ScalarFuncSig::IntDivideDecimal, "divide"}, + + // {tipb::ScalarFuncSig::ModReal, "modulo"}, + // {tipb::ScalarFuncSig::ModDecimal, "modulo"}, + // {tipb::ScalarFuncSig::ModInt, "modulo"}, + + // {tipb::ScalarFuncSig::MultiplyIntUnsigned, "multiply"}, + + // {tipb::ScalarFuncSig::AbsInt, "abs"}, + // {tipb::ScalarFuncSig::AbsUInt, "abs"}, + // {tipb::ScalarFuncSig::AbsReal, "abs"}, + // {tipb::ScalarFuncSig::AbsDecimal, "abs"}, + + // {tipb::ScalarFuncSig::CeilIntToDec, "ceil"}, + // {tipb::ScalarFuncSig::CeilIntToInt, "ceil"}, + // {tipb::ScalarFuncSig::CeilDecToInt, "ceil"}, + // {tipb::ScalarFuncSig::CeilDecToDec, "ceil"}, + // {tipb::ScalarFuncSig::CeilReal, "ceil"}, + + // {tipb::ScalarFuncSig::FloorIntToDec, "floor"}, + // {tipb::ScalarFuncSig::FloorIntToInt, "floor"}, + // {tipb::ScalarFuncSig::FloorDecToInt, "floor"}, + // {tipb::ScalarFuncSig::FloorDecToDec, "floor"}, + // {tipb::ScalarFuncSig::FloorReal, "floor"}, + + //{tipb::ScalarFuncSig::RoundReal, "round"}, + //{tipb::ScalarFuncSig::RoundInt, "round"}, + //{tipb::ScalarFuncSig::RoundDec, "round"}, + //{tipb::ScalarFuncSig::RoundWithFracReal, "cast"}, + //{tipb::ScalarFuncSig::RoundWithFracInt, "cast"}, + //{tipb::ScalarFuncSig::RoundWithFracDec, "cast"}, + + //{tipb::ScalarFuncSig::Log1Arg, "log"}, + //{tipb::ScalarFuncSig::Log2Args, "cast"}, + //{tipb::ScalarFuncSig::Log2, "log2"}, + //{tipb::ScalarFuncSig::Log10, "log10"}, + + //{tipb::ScalarFuncSig::Rand, "rand"}, + //{tipb::ScalarFuncSig::RandWithSeed, "cast"}, + + //{tipb::ScalarFuncSig::Pow, "pow"}, + //{tipb::ScalarFuncSig::Conv, "cast"}, + //{tipb::ScalarFuncSig::CRC32, "cast"}, + //{tipb::ScalarFuncSig::Sign, "cast"}, + + //{tipb::ScalarFuncSig::Sqrt, "sqrt"}, + //{tipb::ScalarFuncSig::Acos, "acos"}, + //{tipb::ScalarFuncSig::Asin, "asin"}, + //{tipb::ScalarFuncSig::Atan1Arg, "atan"}, + //{tipb::ScalarFuncSig::Atan2Args, "cast"}, + //{tipb::ScalarFuncSig::Cos, "cos"}, + //{tipb::ScalarFuncSig::Cot, "cast"}, + //{tipb::ScalarFuncSig::Degrees, "cast"}, + //{tipb::ScalarFuncSig::Exp, "exp"}, + //{tipb::ScalarFuncSig::PI, "cast"}, + //{tipb::ScalarFuncSig::Radians, "cast"}, + // {tipb::ScalarFuncSig::Sin, "sin"}, + // {tipb::ScalarFuncSig::Tan, "tan"}, + // {tipb::ScalarFuncSig::TruncateInt, "trunc"}, + // {tipb::ScalarFuncSig::TruncateReal, "trunc"}, + //{tipb::ScalarFuncSig::TruncateDecimal, "cast"}, + + {tipb::ScalarFuncSig::LogicalAnd, FilterParser::RSFilterType::And}, + {tipb::ScalarFuncSig::LogicalOr, FilterParser::RSFilterType::Or}, + // {tipb::ScalarFuncSig::LogicalXor, "xor"}, + {tipb::ScalarFuncSig::UnaryNotDecimal, FilterParser::RSFilterType::Not}, + {tipb::ScalarFuncSig::UnaryNotInt, FilterParser::RSFilterType::Not}, + {tipb::ScalarFuncSig::UnaryNotReal, FilterParser::RSFilterType::Not}, + + // {tipb::ScalarFuncSig::UnaryMinusInt, "negate"}, + // {tipb::ScalarFuncSig::UnaryMinusReal, "negate"}, + // {tipb::ScalarFuncSig::UnaryMinusDecimal, "negate"}, + + // {tipb::ScalarFuncSig::DecimalIsNull, "isNull"}, + // {tipb::ScalarFuncSig::DurationIsNull, "isNull"}, + // {tipb::ScalarFuncSig::RealIsNull, "isNull"}, + // {tipb::ScalarFuncSig::StringIsNull, "isNull"}, + // {tipb::ScalarFuncSig::TimeIsNull, "isNull"}, + // {tipb::ScalarFuncSig::IntIsNull, "isNull"}, + // {tipb::ScalarFuncSig::JsonIsNull, "isNull"}, + + //{tipb::ScalarFuncSig::BitAndSig, "cast"}, + //{tipb::ScalarFuncSig::BitOrSig, "cast"}, + //{tipb::ScalarFuncSig::BitXorSig, "cast"}, + //{tipb::ScalarFuncSig::BitNegSig, "cast"}, + //{tipb::ScalarFuncSig::IntIsTrue, "cast"}, + //{tipb::ScalarFuncSig::RealIsTrue, "cast"}, + //{tipb::ScalarFuncSig::DecimalIsTrue, "cast"}, + //{tipb::ScalarFuncSig::IntIsFalse, "cast"}, + //{tipb::ScalarFuncSig::RealIsFalse, "cast"}, + //{tipb::ScalarFuncSig::DecimalIsFalse, "cast"}, + + //{tipb::ScalarFuncSig::LeftShift, "cast"}, + //{tipb::ScalarFuncSig::RightShift, "cast"}, + + //{tipb::ScalarFuncSig::BitCount, "cast"}, + //{tipb::ScalarFuncSig::GetParamString, "cast"}, + //{tipb::ScalarFuncSig::GetVar, "cast"}, + //{tipb::ScalarFuncSig::RowSig, "cast"}, + //{tipb::ScalarFuncSig::SetVar, "cast"}, + //{tipb::ScalarFuncSig::ValuesDecimal, "cast"}, + //{tipb::ScalarFuncSig::ValuesDuration, "cast"}, + //{tipb::ScalarFuncSig::ValuesInt, "cast"}, + //{tipb::ScalarFuncSig::ValuesJSON, "cast"}, + //{tipb::ScalarFuncSig::ValuesReal, "cast"}, + //{tipb::ScalarFuncSig::ValuesString, "cast"}, + //{tipb::ScalarFuncSig::ValuesTime, "cast"}, + + // {tipb::ScalarFuncSig::InInt, "in"}, + // {tipb::ScalarFuncSig::InReal, "in"}, + // {tipb::ScalarFuncSig::InString, "in"}, + // {tipb::ScalarFuncSig::InDecimal, "in"}, + // {tipb::ScalarFuncSig::InTime, "in"}, + // {tipb::ScalarFuncSig::InDuration, "in"}, + // {tipb::ScalarFuncSig::InJson, "in"}, + + // {tipb::ScalarFuncSig::IfNullInt, "ifNull"}, + // {tipb::ScalarFuncSig::IfNullReal, "ifNull"}, + // {tipb::ScalarFuncSig::IfNullString, "ifNull"}, + // {tipb::ScalarFuncSig::IfNullDecimal, "ifNull"}, + // {tipb::ScalarFuncSig::IfNullTime, "ifNull"}, + // {tipb::ScalarFuncSig::IfNullDuration, "ifNull"}, + // {tipb::ScalarFuncSig::IfNullJson, "ifNull"}, + + // {tipb::ScalarFuncSig::IfInt, "if"}, + // {tipb::ScalarFuncSig::IfReal, "if"}, + // {tipb::ScalarFuncSig::IfString, "if"}, + // {tipb::ScalarFuncSig::IfDecimal, "if"}, + // {tipb::ScalarFuncSig::IfTime, "if"}, + // {tipb::ScalarFuncSig::IfDuration, "if"}, + // {tipb::ScalarFuncSig::IfJson, "if"}, + + //todo need further check for caseWithExpression and multiIf + //{tipb::ScalarFuncSig::CaseWhenInt, "caseWithExpression"}, + //{tipb::ScalarFuncSig::CaseWhenReal, "caseWithExpression"}, + //{tipb::ScalarFuncSig::CaseWhenString, "caseWithExpression"}, + //{tipb::ScalarFuncSig::CaseWhenDecimal, "caseWithExpression"}, + //{tipb::ScalarFuncSig::CaseWhenTime, "caseWithExpression"}, + //{tipb::ScalarFuncSig::CaseWhenDuration, "caseWithExpression"}, + //{tipb::ScalarFuncSig::CaseWhenJson, "caseWithExpression"}, + + //{tipb::ScalarFuncSig::AesDecrypt, "cast"}, + //{tipb::ScalarFuncSig::AesEncrypt, "cast"}, + //{tipb::ScalarFuncSig::Compress, "cast"}, + //{tipb::ScalarFuncSig::MD5, "cast"}, + //{tipb::ScalarFuncSig::Password, "cast"}, + //{tipb::ScalarFuncSig::RandomBytes, "cast"}, + //{tipb::ScalarFuncSig::SHA1, "cast"}, + //{tipb::ScalarFuncSig::SHA2, "cast"}, + //{tipb::ScalarFuncSig::Uncompress, "cast"}, + //{tipb::ScalarFuncSig::UncompressedLength, "cast"}, + + //{tipb::ScalarFuncSig::Database, "cast"}, + //{tipb::ScalarFuncSig::FoundRows, "cast"}, + //{tipb::ScalarFuncSig::CurrentUser, "cast"}, + //{tipb::ScalarFuncSig::User, "cast"}, + //{tipb::ScalarFuncSig::ConnectionID, "cast"}, + //{tipb::ScalarFuncSig::LastInsertID, "cast"}, + //{tipb::ScalarFuncSig::LastInsertIDWithID, "cast"}, + //{tipb::ScalarFuncSig::Version, "cast"}, + //{tipb::ScalarFuncSig::TiDBVersion, "cast"}, + //{tipb::ScalarFuncSig::RowCount, "cast"}, + + //{tipb::ScalarFuncSig::Sleep, "cast"}, + //{tipb::ScalarFuncSig::Lock, "cast"}, + //{tipb::ScalarFuncSig::ReleaseLock, "cast"}, + //{tipb::ScalarFuncSig::DecimalAnyValue, "cast"}, + //{tipb::ScalarFuncSig::DurationAnyValue, "cast"}, + //{tipb::ScalarFuncSig::IntAnyValue, "cast"}, + //{tipb::ScalarFuncSig::JSONAnyValue, "cast"}, + //{tipb::ScalarFuncSig::RealAnyValue, "cast"}, + //{tipb::ScalarFuncSig::StringAnyValue, "cast"}, + //{tipb::ScalarFuncSig::TimeAnyValue, "cast"}, + //{tipb::ScalarFuncSig::InetAton, "cast"}, + //{tipb::ScalarFuncSig::InetNtoa, "cast"}, + //{tipb::ScalarFuncSig::Inet6Aton, "cast"}, + //{tipb::ScalarFuncSig::Inet6Ntoa, "cast"}, + //{tipb::ScalarFuncSig::IsIPv4, "cast"}, + //{tipb::ScalarFuncSig::IsIPv4Compat, "cast"}, + //{tipb::ScalarFuncSig::IsIPv4Mapped, "cast"}, + //{tipb::ScalarFuncSig::IsIPv6, "cast"}, + //{tipb::ScalarFuncSig::UUID, "cast"}, + + // {tipb::ScalarFuncSig::LikeSig, "like3Args"}, + //{tipb::ScalarFuncSig::RegexpBinarySig, "cast"}, + //{tipb::ScalarFuncSig::RegexpSig, "cast"}, + + //{tipb::ScalarFuncSig::JsonExtractSig, "cast"}, + //{tipb::ScalarFuncSig::JsonUnquoteSig, "cast"}, + //{tipb::ScalarFuncSig::JsonTypeSig, "cast"}, + //{tipb::ScalarFuncSig::JsonSetSig, "cast"}, + //{tipb::ScalarFuncSig::JsonInsertSig, "cast"}, + //{tipb::ScalarFuncSig::JsonReplaceSig, "cast"}, + //{tipb::ScalarFuncSig::JsonRemoveSig, "cast"}, + //{tipb::ScalarFuncSig::JsonMergeSig, "cast"}, + //{tipb::ScalarFuncSig::JsonObjectSig, "cast"}, + //{tipb::ScalarFuncSig::JsonArraySig, "cast"}, + //{tipb::ScalarFuncSig::JsonValidJsonSig, "cast"}, + //{tipb::ScalarFuncSig::JsonContainsSig, "cast"}, + //{tipb::ScalarFuncSig::JsonArrayAppendSig, "cast"}, + //{tipb::ScalarFuncSig::JsonArrayInsertSig, "cast"}, + //{tipb::ScalarFuncSig::JsonMergePatchSig, "cast"}, + //{tipb::ScalarFuncSig::JsonMergePreserveSig, "cast"}, + //{tipb::ScalarFuncSig::JsonContainsPathSig, "cast"}, + //{tipb::ScalarFuncSig::JsonPrettySig, "cast"}, + //{tipb::ScalarFuncSig::JsonQuoteSig, "cast"}, + //{tipb::ScalarFuncSig::JsonSearchSig, "cast"}, + //{tipb::ScalarFuncSig::JsonStorageSizeSig, "cast"}, + //{tipb::ScalarFuncSig::JsonDepthSig, "cast"}, + //{tipb::ScalarFuncSig::JsonKeysSig, "cast"}, + //{tipb::ScalarFuncSig::JsonLengthSig, "cast"}, + //{tipb::ScalarFuncSig::JsonKeys2ArgsSig, "cast"}, + //{tipb::ScalarFuncSig::JsonValidStringSig, "cast"}, + + //{tipb::ScalarFuncSig::DateFormatSig, "cast"}, + //{tipb::ScalarFuncSig::DateLiteral, "cast"}, + //{tipb::ScalarFuncSig::DateDiff, "cast"}, + //{tipb::ScalarFuncSig::NullTimeDiff, "cast"}, + //{tipb::ScalarFuncSig::TimeStringTimeDiff, "cast"}, + //{tipb::ScalarFuncSig::DurationDurationTimeDiff, "cast"}, + //{tipb::ScalarFuncSig::DurationDurationTimeDiff, "cast"}, + //{tipb::ScalarFuncSig::StringTimeTimeDiff, "cast"}, + //{tipb::ScalarFuncSig::StringDurationTimeDiff, "cast"}, + //{tipb::ScalarFuncSig::StringStringTimeDiff, "cast"}, + //{tipb::ScalarFuncSig::TimeTimeTimeDiff, "cast"}, + + //{tipb::ScalarFuncSig::Date, "cast"}, + //{tipb::ScalarFuncSig::Hour, "cast"}, + //{tipb::ScalarFuncSig::Minute, "cast"}, + //{tipb::ScalarFuncSig::Second, "cast"}, + //{tipb::ScalarFuncSig::MicroSecond, "cast"}, + //{tipb::ScalarFuncSig::Month, "cast"}, + //{tipb::ScalarFuncSig::MonthName, "cast"}, + + //{tipb::ScalarFuncSig::NowWithArg, "cast"}, + //{tipb::ScalarFuncSig::NowWithoutArg, "cast"}, + + //{tipb::ScalarFuncSig::DayName, "cast"}, + //{tipb::ScalarFuncSig::DayOfMonth, "cast"}, + //{tipb::ScalarFuncSig::DayOfWeek, "cast"}, + //{tipb::ScalarFuncSig::DayOfYear, "cast"}, + + //{tipb::ScalarFuncSig::WeekWithMode, "cast"}, + //{tipb::ScalarFuncSig::WeekWithoutMode, "cast"}, + //{tipb::ScalarFuncSig::WeekDay, "cast"}, + //{tipb::ScalarFuncSig::WeekOfYear, "cast"}, + + //{tipb::ScalarFuncSig::Year, "cast"}, + //{tipb::ScalarFuncSig::YearWeekWithMode, "cast"}, + //{tipb::ScalarFuncSig::YearWeekWithoutMode, "cast"}, + + //{tipb::ScalarFuncSig::GetFormat, "cast"}, + //{tipb::ScalarFuncSig::SysDateWithFsp, "cast"}, + //{tipb::ScalarFuncSig::SysDateWithoutFsp, "cast"}, + //{tipb::ScalarFuncSig::CurrentDate, "cast"}, + //{tipb::ScalarFuncSig::CurrentTime0Arg, "cast"}, + //{tipb::ScalarFuncSig::CurrentTime1Arg, "cast"}, + + //{tipb::ScalarFuncSig::Time, "cast"}, + //{tipb::ScalarFuncSig::TimeLiteral, "cast"}, + //{tipb::ScalarFuncSig::UTCDate, "cast"}, + //{tipb::ScalarFuncSig::UTCTimestampWithArg, "cast"}, + //{tipb::ScalarFuncSig::UTCTimestampWithoutArg, "cast"}, + + //{tipb::ScalarFuncSig::AddDatetimeAndDuration, "cast"}, + //{tipb::ScalarFuncSig::AddDatetimeAndString, "cast"}, + //{tipb::ScalarFuncSig::AddTimeDateTimeNull, "cast"}, + //{tipb::ScalarFuncSig::AddStringAndDuration, "cast"}, + //{tipb::ScalarFuncSig::AddStringAndString, "cast"}, + //{tipb::ScalarFuncSig::AddTimeStringNull, "cast"}, + //{tipb::ScalarFuncSig::AddDurationAndDuration, "cast"}, + //{tipb::ScalarFuncSig::AddDurationAndString, "cast"}, + //{tipb::ScalarFuncSig::AddTimeDurationNull, "cast"}, + //{tipb::ScalarFuncSig::AddDateAndDuration, "cast"}, + //{tipb::ScalarFuncSig::AddDateAndString, "cast"}, + + //{tipb::ScalarFuncSig::SubDateAndDuration, "cast"}, + //{tipb::ScalarFuncSig::SubDateAndString, "cast"}, + //{tipb::ScalarFuncSig::SubTimeDateTimeNull, "cast"}, + //{tipb::ScalarFuncSig::SubStringAndDuration, "cast"}, + //{tipb::ScalarFuncSig::SubStringAndString, "cast"}, + //{tipb::ScalarFuncSig::SubTimeStringNull, "cast"}, + //{tipb::ScalarFuncSig::SubDurationAndDuration, "cast"}, + //{tipb::ScalarFuncSig::SubDurationAndString, "cast"}, + //{tipb::ScalarFuncSig::SubDateAndDuration, "cast"}, + //{tipb::ScalarFuncSig::SubDateAndString, "cast"}, + + //{tipb::ScalarFuncSig::UnixTimestampCurrent, "cast"}, + //{tipb::ScalarFuncSig::UnixTimestampInt, "cast"}, + //{tipb::ScalarFuncSig::UnixTimestampDec, "cast"}, + + //{tipb::ScalarFuncSig::ConvertTz, "cast"}, + //{tipb::ScalarFuncSig::MakeDate, "cast"}, + //{tipb::ScalarFuncSig::MakeTime, "cast"}, + //{tipb::ScalarFuncSig::PeriodAdd, "cast"}, + //{tipb::ScalarFuncSig::PeriodDiff, "cast"}, + //{tipb::ScalarFuncSig::Quarter, "cast"}, + + //{tipb::ScalarFuncSig::SecToTime, "cast"}, + //{tipb::ScalarFuncSig::TimeToSec, "cast"}, + //{tipb::ScalarFuncSig::TimestampAdd, "cast"}, + //{tipb::ScalarFuncSig::ToDays, "cast"}, + //{tipb::ScalarFuncSig::ToSeconds, "cast"}, + //{tipb::ScalarFuncSig::UTCTimeWithArg, "cast"}, + //{tipb::ScalarFuncSig::UTCTimestampWithoutArg, "cast"}, + //{tipb::ScalarFuncSig::Timestamp1Arg, "cast"}, + //{tipb::ScalarFuncSig::Timestamp2Args, "cast"}, + //{tipb::ScalarFuncSig::TimestampLiteral, "cast"}, + + //{tipb::ScalarFuncSig::LastDay, "cast"}, + //{tipb::ScalarFuncSig::StrToDateDate, "cast"}, + //{tipb::ScalarFuncSig::StrToDateDatetime, "cast"}, + //{tipb::ScalarFuncSig::StrToDateDuration, "cast"}, + //{tipb::ScalarFuncSig::FromUnixTime1Arg, "cast"}, + //{tipb::ScalarFuncSig::FromUnixTime2Arg, "cast"}, + //{tipb::ScalarFuncSig::ExtractDatetime, "cast"}, + //{tipb::ScalarFuncSig::ExtractDuration, "cast"}, + + //{tipb::ScalarFuncSig::AddDateStringString, "cast"}, + //{tipb::ScalarFuncSig::AddDateStringInt, "cast"}, + //{tipb::ScalarFuncSig::AddDateStringDecimal, "cast"}, + //{tipb::ScalarFuncSig::AddDateIntString, "cast"}, + //{tipb::ScalarFuncSig::AddDateIntInt, "cast"}, + //{tipb::ScalarFuncSig::AddDateDatetimeString, "cast"}, + //{tipb::ScalarFuncSig::AddDateDatetimeInt, "cast"}, + + //{tipb::ScalarFuncSig::SubDateStringString, "cast"}, + //{tipb::ScalarFuncSig::SubDateStringInt, "cast"}, + //{tipb::ScalarFuncSig::SubDateStringDecimal, "cast"}, + //{tipb::ScalarFuncSig::SubDateIntString, "cast"}, + //{tipb::ScalarFuncSig::SubDateIntInt, "cast"}, + //{tipb::ScalarFuncSig::SubDateDatetimeString, "cast"}, + //{tipb::ScalarFuncSig::SubDateDatetimeInt, "cast"}, + + //{tipb::ScalarFuncSig::FromDays, "cast"}, + //{tipb::ScalarFuncSig::TimeFormat, "cast"}, + //{tipb::ScalarFuncSig::TimestampDiff, "cast"}, + + //{tipb::ScalarFuncSig::BitLength, "cast"}, + //{tipb::ScalarFuncSig::Bin, "cast"}, + //{tipb::ScalarFuncSig::ASCII, "cast"}, + //{tipb::ScalarFuncSig::Char, "cast"}, + // {tipb::ScalarFuncSig::CharLength, "lengthUTF8"}, + //{tipb::ScalarFuncSig::Concat, "cast"}, + //{tipb::ScalarFuncSig::ConcatWS, "cast"}, + //{tipb::ScalarFuncSig::Convert, "cast"}, + //{tipb::ScalarFuncSig::Elt, "cast"}, + //{tipb::ScalarFuncSig::ExportSet3Arg, "cast"}, + //{tipb::ScalarFuncSig::ExportSet4Arg, "cast"}, + //{tipb::ScalarFuncSig::ExportSet5Arg, "cast"}, + //{tipb::ScalarFuncSig::FieldInt, "cast"}, + //{tipb::ScalarFuncSig::FieldReal, "cast"}, + //{tipb::ScalarFuncSig::FieldString, "cast"}, + + //{tipb::ScalarFuncSig::FindInSet, "cast"}, + //{tipb::ScalarFuncSig::Format, "cast"}, + //{tipb::ScalarFuncSig::FormatWithLocale, "cast"}, + //{tipb::ScalarFuncSig::FromBase64, "cast"}, + //{tipb::ScalarFuncSig::HexIntArg, "cast"}, + //{tipb::ScalarFuncSig::HexStrArg, "cast"}, + //{tipb::ScalarFuncSig::Insert, "cast"}, + //{tipb::ScalarFuncSig::InsertBinary, "cast"}, + //{tipb::ScalarFuncSig::Instr, "cast"}, + //{tipb::ScalarFuncSig::InstrBinary, "cast"}, + + // {tipb::ScalarFuncSig::LTrim, "ltrim"}, + //{tipb::ScalarFuncSig::Left, "cast"}, + //{tipb::ScalarFuncSig::LeftBinary, "cast"}, + // {tipb::ScalarFuncSig::Length, "length"}, + //{tipb::ScalarFuncSig::Locate2Args, "cast"}, + //{tipb::ScalarFuncSig::Locate3Args, "cast"}, + //{tipb::ScalarFuncSig::LocateBinary2Args, "cast"}, + //{tipb::ScalarFuncSig::LocateBinary3Args, "cast"}, + + // {tipb::ScalarFuncSig::Lower, "lower"}, + //{tipb::ScalarFuncSig::Lpad, "cast"}, + //{tipb::ScalarFuncSig::LpadBinary, "cast"}, + //{tipb::ScalarFuncSig::MakeSet, "cast"}, + //{tipb::ScalarFuncSig::OctInt, "cast"}, + //{tipb::ScalarFuncSig::OctString, "cast"}, + //{tipb::ScalarFuncSig::Ord, "cast"}, + //{tipb::ScalarFuncSig::Quote, "cast"}, + // {tipb::ScalarFuncSig::RTrim, "rtrim"}, + //{tipb::ScalarFuncSig::Repeat, "cast"}, + //{tipb::ScalarFuncSig::Replace, "cast"}, + //{tipb::ScalarFuncSig::Reverse, "cast"}, + //{tipb::ScalarFuncSig::ReverseBinary, "cast"}, + //{tipb::ScalarFuncSig::Right, "cast"}, + //{tipb::ScalarFuncSig::RightBinary, "cast"}, + //{tipb::ScalarFuncSig::Rpad, "cast"}, + //{tipb::ScalarFuncSig::RpadBinary, "cast"}, + //{tipb::ScalarFuncSig::Space, "cast"}, + //{tipb::ScalarFuncSig::Strcmp, "cast"}, + //{tipb::ScalarFuncSig::Substring2Args, "cast"}, + //{tipb::ScalarFuncSig::Substring3Args, "cast"}, + //{tipb::ScalarFuncSig::SubstringBinary2Args, "cast"}, + //{tipb::ScalarFuncSig::SubstringBinary3Args, "cast"}, + //{tipb::ScalarFuncSig::SubstringIndex, "cast"}, + + //{tipb::ScalarFuncSig::ToBase64, "cast"}, + //{tipb::ScalarFuncSig::Trim1Arg, "cast"}, + //{tipb::ScalarFuncSig::Trim2Args, "cast"}, + //{tipb::ScalarFuncSig::Trim3Args, "cast"}, + //{tipb::ScalarFuncSig::UnHex, "cast"}, + // {tipb::ScalarFuncSig::Upper, "upper"}, +}; + +} // namespace DM + +} // namespace DB diff --git a/dbms/src/Storages/Page/PageFile.cpp b/dbms/src/Storages/Page/PageFile.cpp index 95414564309..ce19fca591c 100644 --- a/dbms/src/Storages/Page/PageFile.cpp +++ b/dbms/src/Storages/Page/PageFile.cpp @@ -183,9 +183,13 @@ std::pair analyzeMetaFile( // // check the checksum of WriteBatch const auto wb_bytes_without_checksum = wb_bytes - sizeof(Checksum); const auto wb_checksum = PageUtil::get(wb_start_pos + wb_bytes_without_checksum); - if (wb_checksum != CityHash_v1_0_2::CityHash64(wb_start_pos, wb_bytes_without_checksum)) + const auto checksum_calc = CityHash_v1_0_2::CityHash64(wb_start_pos, wb_bytes_without_checksum); + if (wb_checksum != checksum_calc) { - throw Exception("Write batch checksum not match, path: " + path + ", offset: " + DB::toString(wb_start_pos - meta_data), + std::stringstream ss; + ss << "expected: " << std::hex << wb_checksum << ", but: " << checksum_calc; + throw Exception("Write batch checksum not match, path: " + path + ", offset: " + DB::toString(wb_start_pos - meta_data) + + ", bytes: " + DB::toString(wb_bytes) + ", " + ss.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH); } diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index a37089db8f2..c2dc4a9e7d4 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -536,8 +537,32 @@ BlockInputStreams StorageDeltaMerge::read( // } } - return store->read( - context, context.getSettingsRef(), to_read, ranges, num_streams, /*max_version=*/mvcc_query_info.read_tso, max_block_size); + DM::RSOperatorPtr rs_operator = DM::EMPTY_FILTER; + if (likely(query_info.dag_query)) + { + /// Query from TiDB / TiSpark + auto create_attr_by_column_id = [this](ColumnID column_id) -> Attr { + const ColumnDefines & defines = this->store->getTableColumns(); + auto iter = std::find_if( + defines.begin(), defines.end(), [column_id](const ColumnDefine & d) -> bool { return d.id == column_id; }); + if (iter != defines.end()) + return Attr{.col_name = iter->name, .col_id = iter->id, .type = iter->type}; + else + // Maybe throw an exception? Or check if `type` is nullptr before creating filter? + return Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}}; + }; + rs_operator = FilterParser::parseDAGQuery(*query_info.dag_query, std::move(create_attr_by_column_id), log); + } + else + { + // Query from ch client + rs_operator = FilterParser::parseSelectQuery(select_query, log); + } + if (likely(rs_operator != DM::EMPTY_FILTER)) + LOG_TRACE(log, "RS operator: " << rs_operator->toString()); + + return store->read(context, context.getSettingsRef(), to_read, ranges, num_streams, /*max_version=*/mvcc_query_info.read_tso, + rs_operator, max_block_size); } } @@ -549,7 +574,7 @@ void StorageDeltaMerge::deleteRows(const Context & context, size_t rows) { ColumnDefines to_read{getExtraHandleColumnDefine()}; - auto stream = store->read(context, context.getSettingsRef(), to_read, {DM::HandleRange::newAll()}, 1, MAX_UINT64)[0]; + auto stream = store->read(context, context.getSettingsRef(), to_read, {DM::HandleRange::newAll()}, 1, MAX_UINT64, EMPTY_FILTER)[0]; stream->readPrefix(); Block block; while ((block = stream->read())) @@ -563,7 +588,7 @@ void StorageDeltaMerge::deleteRows(const Context & context, size_t rows) DM::HandleRange range = DM::HandleRange::newAll(); { ColumnDefines to_read{getExtraHandleColumnDefine()}; - auto stream = store->read(context, context.getSettingsRef(), to_read, {DM::HandleRange::newAll()}, 1, MAX_UINT64)[0]; + auto stream = store->read(context, context.getSettingsRef(), to_read, {DM::HandleRange::newAll()}, 1, MAX_UINT64, EMPTY_FILTER)[0]; stream->readPrefix(); Block block; size_t index = 0; From d891c2467f008bc222148ee9b6454a815cebeee7 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Tue, 19 Nov 2019 12:31:39 +0800 Subject: [PATCH 02/13] multiple conditions with operator and by default --- .../FilterParser/FilterParser_dag.cpp | 17 ++++++++++++++--- .../tests/gtest_dm_delta_merge_store.cpp | 19 +++++++++++++++++++ 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_dag.cpp b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_dag.cpp index d55e7763081..3c36c973542 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_dag.cpp +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_dag.cpp @@ -208,11 +208,22 @@ RSOperatorPtr FilterParser::parseDAGQuery(const DAGQueryInfo & dag_info, FilterP } const auto & selection = dag_info.dag.getSelection(); - if (likely(selection.conditions_size() == 1)) + if (selection.conditions_size() == 1) op = cop::parseTiExpr(selection.conditions(0), column_ids, creator, log); else - // This should not happen - op = createUnsupported(selection.DebugString(), "tipb::Selection with multiple conditions", false); + { + /// By default, multiple conditions with operator "and" + RSOperators children; + for (Int32 i = 0; i < selection.conditions_size(); ++i) + { + const auto & child = selection.conditions(i); + if (isFunctionExpr(child)) + children.emplace_back(cop::parseTiExpr(child, column_ids, creator, log)); + else + children.emplace_back(createUnsupported(child.DebugString(), "child of logical and is not function", false)); + } + op = createAnd(children); + } return op; } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index e74df034321..1cdeef1d770 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -191,6 +191,7 @@ TEST_F(DeltaMergeStore_test, SimpleWriteRead) {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -279,6 +280,7 @@ TEST_F(DeltaMergeStore_test, DeleteRead) {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -314,6 +316,7 @@ TEST_F(DeltaMergeStore_test, DeleteRead) {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -359,6 +362,7 @@ TEST_F(DeltaMergeStore_test, WriteMultipleBlock) {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -402,6 +406,7 @@ TEST_F(DeltaMergeStore_test, WriteMultipleBlock) {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -431,6 +436,7 @@ TEST_F(DeltaMergeStore_test, WriteMultipleBlock) {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ UInt64(1), + EMPTY_FILTER, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -482,6 +488,7 @@ TEST_F(DeltaMergeStore_test, DISABLED_WriteLargeBlock) {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -519,6 +526,7 @@ TEST_F(DeltaMergeStore_test, DISABLED_WriteLargeBlock) {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; // block_num represents index of current segment @@ -574,6 +582,7 @@ TEST_F(DeltaMergeStore_test, ReadWithSpecifyTso) {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -595,6 +604,7 @@ TEST_F(DeltaMergeStore_test, ReadWithSpecifyTso) {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ tso2, + EMPTY_FILTER, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -616,6 +626,7 @@ TEST_F(DeltaMergeStore_test, ReadWithSpecifyTso) {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ tso1, + EMPTY_FILTER, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -637,6 +648,7 @@ TEST_F(DeltaMergeStore_test, ReadWithSpecifyTso) {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ tso1 - 1, + EMPTY_FILTER, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -683,6 +695,7 @@ try {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -802,6 +815,7 @@ try {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -924,6 +938,7 @@ try {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -1027,6 +1042,7 @@ try {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -1137,6 +1153,7 @@ try {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -1222,6 +1239,7 @@ try {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -1326,6 +1344,7 @@ try {HandleRange::newAll()}, /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; From be7dc381cb4e05cbc34cd0149ac2befa2377cb40 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Tue, 19 Nov 2019 19:19:40 +0800 Subject: [PATCH 03/13] add more debugging info --- dbms/src/Storages/DeltaMerge/Chunk.h | 22 +++++++++---- .../DeltaMerge/ChunkBlockInputStream.h | 9 ++++++ dbms/src/Storages/DeltaMerge/Index/MinMax.h | 26 +++++++++++++++ .../Storages/DeltaMerge/Index/MinMaxIndex.cpp | 5 +++ .../Storages/DeltaMerge/Index/MinMaxIndex.h | 3 ++ dbms/src/Storages/DeltaMerge/Segment.cpp | 32 +++++++++++++++++++ dbms/src/Storages/Page/PageFile.h | 2 +- dbms/src/Storages/Page/PageStorage.cpp | 24 ++++++++++---- 8 files changed, 108 insertions(+), 15 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Chunk.h b/dbms/src/Storages/DeltaMerge/Chunk.h index d0b4dda2b99..4d215b1b4fa 100644 --- a/dbms/src/Storages/DeltaMerge/Chunk.h +++ b/dbms/src/Storages/DeltaMerge/Chunk.h @@ -108,6 +108,14 @@ class Chunk void serialize(WriteBuffer & buf) const; static Chunk deserialize(ReadBuffer & buf); + String info() const + { + if (likely(!is_delete_range)) + return "Chunk[" + DB::toString(handle_start) + "," + DB::toString(handle_end) + "]"; + else + return "DeleteRange[" + DB::toString(handle_start) + "," + DB::toString(handle_end) + ")"; + } + private: Handle handle_start; Handle handle_end; @@ -123,15 +131,15 @@ using GenPageId = std::function; Chunk createRefChunk(const Chunk & chunk, const GenPageId & gen_data_page_id, WriteBatch & wb); Chunks createRefChunks(const Chunks & chunks, const GenPageId & gen_data_page_id, WriteBatch & wb); -void serializeChunks(WriteBuffer & buf, - Chunks::const_iterator begin, +void serializeChunks(WriteBuffer & buf, + Chunks::const_iterator begin, Chunks::const_iterator end, - const Chunk * extra1 = nullptr, - const Chunk * extra2 = nullptr); -void serializeChunks(WriteBuffer & buf, // - Chunks::const_iterator begin, + const Chunk * extra1 = nullptr, + const Chunk * extra2 = nullptr); +void serializeChunks(WriteBuffer & buf, // + Chunks::const_iterator begin, Chunks::const_iterator end, - const Chunks & extr_chunks); + const Chunks & extr_chunks); Chunks deserializeChunks(ReadBuffer & buf); diff --git a/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h b/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h index 0f5f8635103..06c1609ee7f 100644 --- a/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h @@ -32,6 +32,15 @@ class ChunkBlockInputStream final : public IBlockInputStream } } + ~ChunkBlockInputStream() + { + size_t num_skipped = 0; + for (const auto & is_skip : skip_chunks) + num_skipped += is_skip; + + LOG_DEBUG(&Logger::get("ChunkBlockInputStream"), String("Skip: ") << num_skipped << " / " << chunks.size() << " chunks"); + } + String getName() const override { return "Chunk"; } Block getHeader() const override { return toEmptyBlock(read_columns); } diff --git a/dbms/src/Storages/DeltaMerge/Index/MinMax.h b/dbms/src/Storages/DeltaMerge/Index/MinMax.h index 23db275bf53..4641aa3242d 100644 --- a/dbms/src/Storages/DeltaMerge/Index/MinMax.h +++ b/dbms/src/Storages/DeltaMerge/Index/MinMax.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include @@ -93,6 +95,8 @@ struct MinMaxValue virtual RSResult checkEqual(const Field & value, const DataTypePtr & type) = 0; virtual RSResult checkGreater(const Field & value, const DataTypePtr & type) = 0; virtual RSResult checkGreaterEqual(const Field & value, const DataTypePtr & type) = 0; + + virtual String toString() const = 0; }; /// Number types. @@ -168,6 +172,13 @@ struct MinMaxValueFixed : public MinMaxValue RSResult checkGreater(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkGreater(value, type, min, max); } RSResult checkGreaterEqual(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkGreaterEqual(value, type, min, max); } // clang-format on + + String toString() const override + { + std::stringstream ss; + ss << "{\"type\":\"fixed\",\"min\":\"" << DB::toString(min) << "\",\"max\":\"" << DB::toString(max) << "\"}"; + return ss.str(); + } }; /// String type only. @@ -244,6 +255,13 @@ struct MinMaxValueString : public MinMaxValue RSResult checkGreater(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkGreater(value, type, min, max); } RSResult checkGreaterEqual(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkGreaterEqual(value, type, min, max); } // clang-format on + + String toString() const override + { + std::stringstream ss; + ss << "{\"type\":\"string\",\"min\":\"" << min << "\",\"max\":\"" << max << "\"}"; + return ss.str(); + } }; /// Other types. @@ -314,6 +332,14 @@ struct MinMaxValueDataGeneric : public MinMaxValue RSResult checkGreater(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkGreater(value, type, min, max); } RSResult checkGreaterEqual(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkGreaterEqual(value, type, min, max); } // clang-format on + + String toString() const override + { + std::stringstream ss; + ss << "{\"type\":\"generic\",\"min\":\"" << applyVisitor(FieldVisitorToString(), min) << "\",\"max\":\"" + << applyVisitor(FieldVisitorToString(), max) << "\"}"; + return ss.str(); + } }; diff --git a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp index 5ad3fe8d8d2..d3c26a2e3eb 100644 --- a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp @@ -114,5 +114,10 @@ RSResult MinMaxIndex::checkGreaterEqual(const Field & value, const DataTypePtr & return minmax->checkGreaterEqual(value, type); } +String MinMaxIndex::toString() const +{ + return this->minmax->toString(); +} + } // namespace DM } // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h index 050c294e288..c29d2492a4e 100644 --- a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h +++ b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h @@ -41,6 +41,9 @@ class MinMaxIndex RSResult checkEqual(const Field & value, const DataTypePtr & type); RSResult checkGreater(const Field & value, const DataTypePtr & type, int nan_direction); RSResult checkGreaterEqual(const Field & value, const DataTypePtr & type, int nan_direction); + + String toString() const; + }; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 64ae9165c87..5ef4a47bf59 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -139,6 +139,35 @@ SegmentPtr Segment::restoreSegment(DMContext & context, PageId segment_id) segment->delta->restore(OpContext::createForLogStorage(context)); segment->stable->restore(OpContext::createForDataStorage(context)); + if (unlikely(segment->log->trace())) + { + for (const auto & chunk : segment->delta->getChunks()) + { + for (auto && [col_id, meta] : chunk.getMetas()) + { + (void)col_id; + if (meta.minmax) + { + LOG_TRACE(segment->log, + "Segment[" << segment->segmentId() << "] VS[delta] " << chunk.info() << " Col[" << meta.col_id << "] MinMax[" + << meta.minmax->toString() << "]"); + } + } + } + for (const auto & chunk : segment->stable->getChunks()) + { + for (auto && [col_id, meta] : chunk.getMetas()) + { + (void)col_id; + if (meta.minmax) + { + LOG_TRACE(segment->log, + "Segment[" << segment->segmentId() << "] VS[stable] " << chunk.info() << " Col[" << meta.col_id << "] MinMax[" + << meta.minmax->toString() << "]"); + } + } + } + } return segment; } @@ -461,6 +490,7 @@ DiskValueSpacePtr Segment::prepareMergeDelta(DMContext & dm_context, data_stream = std::make_shared>(data_stream, range, 0); data_stream = std::make_shared>(data_stream, handle, min_version); + //TODO: stable chunks should split pk OpContext opc = OpContext::createForDataStorage(dm_context); Chunks new_stable_chunks = DiskValueSpace::writeChunks(opc, data_stream, wbs.data); @@ -851,6 +881,7 @@ SegmentPair Segment::doSplitPhysical(DMContext & dm_context, STABLE_CHUNK_ROWS); my_data = std::make_shared>(my_data, my_range, 0); my_data = std::make_shared>(my_data, handle, min_version); + //TODO: stable chunks should split pk auto tmp = DiskValueSpace::writeChunks(opc, my_data, wbs.data); my_new_stable_chunks.swap(tmp); } @@ -868,6 +899,7 @@ SegmentPair Segment::doSplitPhysical(DMContext & dm_context, STABLE_CHUNK_ROWS); other_data = std::make_shared>(other_data, other_range, 0); other_data = std::make_shared>(other_data, handle, min_version); + //TODO: stable chunks should split pk auto tmp = DiskValueSpace::writeChunks(opc, other_data, wbs.data); other_new_stable_chunks.swap(tmp); } diff --git a/dbms/src/Storages/Page/PageFile.h b/dbms/src/Storages/Page/PageFile.h index 699a37d2e4e..9fa99c743cb 100644 --- a/dbms/src/Storages/Page/PageFile.h +++ b/dbms/src/Storages/Page/PageFile.h @@ -124,11 +124,11 @@ class PageFile : public Allocator bool isExist() const; void removeDataIfExists() const; + String folderPath() const; private: /// Create a new page file. PageFile(PageFileId file_id_, UInt32 level_, const String & parent_path, bool is_tmp_, bool is_create, Poco::Logger * log); - String folderPath() const; String dataPath() const { return folderPath() + "/page"; } String metaPath() const { return folderPath() + "/meta"; } diff --git a/dbms/src/Storages/Page/PageStorage.cpp b/dbms/src/Storages/Page/PageStorage.cpp index 0f006e0370f..71e0731ce07 100644 --- a/dbms/src/Storages/Page/PageStorage.cpp +++ b/dbms/src/Storages/Page/PageStorage.cpp @@ -83,16 +83,26 @@ PageStorage::PageStorage(String name, const String & storage_path_, const Config #ifdef DELTA_VERSION_SET for (auto & page_file : page_files) { - PageEntriesEdit edit; - const_cast(page_file).readAndSetPageMetas(edit); + try + { + PageEntriesEdit edit; + const_cast(page_file).readAndSetPageMetas(edit); - // Only level 0 is writable. - if (page_file.getLevel() == 0) + // Only level 0 is writable. + if (page_file.getLevel() == 0) + { + write_file = page_file; + } + + // apply edit to new version + versioned_page_entries.apply(edit); + } + catch (Exception & e) { - write_file = page_file; + /// Better diagnostics. + e.addMessage("(while applying edit from " + page_file.folderPath() + " to PageStorage: " + storage_name + ")"); + throw; } - // apply edit to new version - versioned_page_entries.apply(edit); } #else auto snapshot = versioned_page_entries.getSnapshot(); From 270d2a6596c469497b5fd05eb867f644dbe2f928 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 21 Nov 2019 16:26:30 +0800 Subject: [PATCH 04/13] [FLASH-477] Make chunks in stable not overlap --- .../DMVersionFilterBlockInputStream.h | 16 +- .../Storages/DeltaMerge/DeltaMergeHelpers.h | 22 +- .../Storages/DeltaMerge/DiskValueSpace.cpp | 117 +++++++++- dbms/src/Storages/DeltaMerge/DiskValueSpace.h | 4 +- .../DeltaMerge/tests/dm_basic_include.h | 54 +++++ .../tests/gtest_dm_disk_value_space.cpp | 214 +++++++++++++++++- 6 files changed, 399 insertions(+), 28 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h index fe1ea157cdc..31de950323e 100644 --- a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h @@ -77,7 +77,7 @@ class DMVersionFilterBlockInputStream : public IProfilingBlockInputStream bool initNextBlock() { - raw_block = readNextBlock(); + raw_block = ::DB::DM::readNextBlock(children.back()); if (!raw_block) { handle_col_data = nullptr; @@ -94,20 +94,6 @@ class DMVersionFilterBlockInputStream : public IProfilingBlockInputStream } } - /// This method guarantees that the returned valid block is not empty. - Block readNextBlock() - { - while (true) - { - Block res = children.back()->read(); - if (!res) - return {}; - if (!res.rows()) - continue; - return res; - } - } - private: UInt64 version_limit; Block header; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h b/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h index 5371f6209f2..bb6beb415ec 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -123,7 +124,12 @@ inline PaddedPODArray const * getColumnVectorDataPtr(const Block & block, siz return toColumnVectorDataPtr(block.getByPosition(pos).column); } -inline void addColumnToBlock(Block & block, ColId col_id, const String & col_name, const DataTypePtr & col_type, const ColumnPtr & col, const Field & default_value = Field()) +inline void addColumnToBlock(Block & block, + ColId col_id, + const String & col_name, + const DataTypePtr & col_type, + const ColumnPtr & col, + const Field & default_value = Field()) { ColumnWithTypeAndName column(col, col_type, col_name, col_id, default_value); block.insert(std::move(column)); @@ -137,6 +143,20 @@ inline Block toEmptyBlock(const ColumnDefines & columns) return block; } +/// This method guarantees that the returned valid block is not empty. +inline Block readNextBlock(const BlockInputStreamPtr & in) +{ + while (true) + { + Block res = in->read(); + if (!res) + return Block{}; + if (!res.rows()) + continue; + return res; + } +} + inline void convertColumn(Block & block, size_t pos, const DataTypePtr & to_type, const Context & context) { const IDataType * to_type_ptr = to_type.get(); diff --git a/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp b/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp index 716b3ec891b..899087ca6e0 100644 --- a/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp @@ -265,19 +265,122 @@ DiskValueSpacePtr DiskValueSpace::applyAppendTask(const OpContext & context, con return {}; } -Chunks DiskValueSpace::writeChunks(const OpContext & context, const BlockInputStreamPtr & input_stream, WriteBatch & wb) +namespace { +size_t findCutOffsetInNextBlock(const Block & cur_block, const Block & next_block, const String & pk_column_name) +{ + assert(cur_block); + if (!next_block) + return 0; + + auto cur_col = cur_block.getByName(pk_column_name).column; + const Int64 last_curr_pk = cur_col->getInt(cur_col->size() - 1); + auto next_col = next_block.getByName(pk_column_name).column; + size_t cut_offset = 0; + for (/* */; cut_offset < next_col->size(); ++cut_offset) + { + const Int64 next_pk = next_col->getInt(cut_offset); + if (next_pk != last_curr_pk) + { + if constexpr (DM_RUN_CHECK) + { + if (unlikely(next_pk < last_curr_pk)) + throw Exception("InputStream is not sorted, pk in next block is smaller than current block: " + toString(next_pk) + + " < " + toString(last_curr_pk), + ErrorCodes::LOGICAL_ERROR); + } + break; + } + } + return cut_offset; +} +} // namespace + +Chunks DiskValueSpace::writeChunks(const OpContext & context, const BlockInputStreamPtr & sorted_input_stream, WriteBatch & wb) +{ + const String & pk_column_name = context.dm_context.handle_column.name; + if constexpr (DM_RUN_CHECK) + { + // Sanity check for existence of pk column + assert(EXTRA_HANDLE_COLUMN_TYPE->equals(*DataTypeFactory::instance().get("Int64"))); + Block header = sorted_input_stream->getHeader(); + if (!header.has(pk_column_name)) + { + throw Exception("Try to write block to Chunk without pk column", ErrorCodes::LOGICAL_ERROR); + } + } + // TODO: investigate which way is better for scan: written by chunks vs written by columns. Chunks chunks; + Block cur_block = ::DB::DM::readNextBlock(sorted_input_stream); + Block next_block; while (true) { - Block block = input_stream->read(); - if (!block) + if (!cur_block) break; - if (!block.rows()) - continue; - Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, block); - chunks.push_back(std::move(chunk)); + + next_block = ::DB::DM::readNextBlock(sorted_input_stream); + + const size_t cut_offset = findCutOffsetInNextBlock(cur_block, next_block, pk_column_name); + if (cut_offset != 0) + { + const size_t next_block_nrows = next_block.rows(); + for (size_t col_idx = 0; col_idx != cur_block.columns(); ++col_idx) + { + auto & cur_col_with_name = cur_block.getByPosition(col_idx); + auto & next_col_with_name = next_block.getByPosition(col_idx); + auto * cur_col_raw = const_cast(cur_col_with_name.column.get()); + cur_col_raw->insertRangeFrom(*next_col_with_name.column, 0, cut_offset); + if (cut_offset != next_block_nrows) + { + // Pop front `cut_offset` elems from `next_col_with_name` + assert(next_block_nrows == next_col_with_name.column->size()); + MutableColumnPtr cutted_next_column = next_col_with_name.column->cloneEmpty(); + cutted_next_column->insertRangeFrom(*next_col_with_name.column, cut_offset, next_block_nrows - cut_offset); + next_col_with_name.column = cutted_next_column->getPtr(); + } + } + if (cut_offset != next_block_nrows) + { + // We merge some rows to `cur_block`, make it as a chunk. + Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, cur_block); + chunks.emplace_back(std::move(chunk)); + cur_block = next_block; + } + // else we merge all rows from `next_block` to `cur_block`, continue to check if we should merge more blocks. + } + else + { + // There is no pk overlap between `cur_block` and `next_block`, just write `cur_block`. + Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, cur_block); + chunks.emplace_back(std::move(chunk)); + cur_block = next_block; + } + } + if constexpr (DM_RUN_CHECK) + { + // Sanity check + if (chunks.size() > 1) + { + for (size_t i = 1; i < chunks.size(); ++i) + { + const Chunk & prev = chunks[i - 1]; + const Chunk & curr = chunks[i]; + if (prev.isDeleteRange() || curr.isDeleteRange()) + { + throw Exception("Unexpected DeleteRange in stable inputstream. prev:" + prev.info() + " curr: " + curr.info(), + ErrorCodes::LOGICAL_ERROR); + } + + const HandlePair prev_handle = prev.getHandleFirstLast(); + const HandlePair curr_handle = curr.getHandleFirstLast(); + // pk should be increasing and no overlap between chunks + if (prev_handle.second >= curr_handle.first) + { + throw Exception("Overlap chunks between " + prev.info() + " and " + curr.info(), ErrorCodes::LOGICAL_ERROR); + } + } + } } return chunks; } diff --git a/dbms/src/Storages/DeltaMerge/DiskValueSpace.h b/dbms/src/Storages/DeltaMerge/DiskValueSpace.h index 2b83dfeaf68..58b65675c33 100644 --- a/dbms/src/Storages/DeltaMerge/DiskValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/DiskValueSpace.h @@ -148,9 +148,9 @@ class DiskValueSpace AppendTaskPtr createAppendTask(const OpContext & context, WriteBatches & wbs, const BlockOrDelete & update) const; DiskValueSpacePtr applyAppendTask(const OpContext & context, const AppendTaskPtr & task, const BlockOrDelete & update); - /// Write the blocks from input_stream into underlying storage, the returned chunks can be added to + /// Write the blocks from sorted_input_stream into underlying storage, the returned chunks can be added to /// specified value space instance by #setChunks or #appendChunkWithCache later. - static Chunks writeChunks(const OpContext & context, const BlockInputStreamPtr & input_stream, WriteBatch & wb); + static Chunks writeChunks(const OpContext & context, const BlockInputStreamPtr & sorted_input_stream, WriteBatch & wb); static Chunk writeDelete(const OpContext & context, const HandleRange & delete_range); diff --git a/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h b/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h index 0c8306895a6..fbf85efe913 100644 --- a/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h +++ b/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h @@ -114,6 +114,60 @@ class DMTestEnv return block; } + /** + * Create a simple block with 3 columns: + * * `pk` - Int64 / `version` / `tag` + * @param pk `pk`'s value + * @param tso_beg `tso`'s value begin + * @param tso_end `tso`'s value end (not included) + * @return + */ + static Block prepareBlockWithIncreasingTso(Int64 pk, size_t tso_beg, size_t tso_end) + { + Block block; + const size_t num_rows = (tso_end - tso_beg); + { + ColumnWithTypeAndName col1(std::make_shared(), pk_name); + { + IColumn::MutablePtr m_col = col1.type->createColumn(); + // insert form large to small + for (size_t i = 0; i < num_rows; i++) + { + Field field = Int64(pk); + m_col->insert(field); + } + col1.column = std::move(m_col); + } + block.insert(col1); + + ColumnWithTypeAndName version_col(VERSION_COLUMN_TYPE, VERSION_COLUMN_NAME); + { + IColumn::MutablePtr m_col = version_col.type->createColumn(); + for (size_t i = 0; i < num_rows; ++i) + { + Field field = Int64(tso_beg + i); + m_col->insert(field); + } + version_col.column = std::move(m_col); + } + block.insert(version_col); + + ColumnWithTypeAndName tag_col(TAG_COLUMN_TYPE, TAG_COLUMN_NAME); + { + IColumn::MutablePtr m_col = tag_col.type->createColumn(); + auto & column_data = typeid_cast &>(*m_col).getData(); + column_data.resize(num_rows); + for (size_t i = 0; i < num_rows; ++i) + { + column_data[i] = 0; + } + tag_col.column = std::move(m_col); + } + block.insert(tag_col); + } + return block; + } + /// prepare a row like this: /// {"pk":pk, "version":tso, "delete_mark":mark, "colname":value} static Block prepareOneRowBlock(Int64 pk, UInt64 tso, UInt8 mark, const String & colname, const String & value) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp index a151fc29d14..8ef7dcac27a 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp @@ -36,7 +36,7 @@ class DiskValueSpace_test : public ::testing::Test storage_pool = std::make_unique("test.t1", path); Context & context = DMTestEnv::getContext(); - table_handle_define = ColumnDefine(1, "pk", std::make_shared()); + table_handle_define = ColumnDefine(1, DMTestEnv::pk_name, std::make_shared()); table_columns.clear(); table_columns.emplace_back(table_handle_define); table_columns.emplace_back(getVersionColumnDefine()); @@ -52,9 +52,9 @@ class DiskValueSpace_test : public ::testing::Test .handle_column = table_handle_define, .min_version = 0, - .not_compress = settings.not_compress_columns, + .not_compress = settings.not_compress_columns, - .segment_limit_rows = context.getSettingsRef().dm_segment_limit_rows, + .segment_limit_rows = context.getSettingsRef().dm_segment_limit_rows, .delta_limit_rows = context.getSettingsRef().dm_segment_delta_limit_rows, .delta_limit_bytes = context.getSettingsRef().dm_segment_delta_limit_bytes, @@ -188,6 +188,214 @@ TEST_F(DiskValueSpace_test, LogStorageWriteRead) } } +TEST_F(DiskValueSpace_test, writeChunks_OneBlock) +{ + const Int64 pk_min = 20, pk_max = 40; + Block block = DMTestEnv::prepareSimpleWriteBlock(pk_min, pk_max, false); + auto in = std::make_shared(block); + + WriteBatch wb; + auto opc = DiskValueSpace::OpContext::createForDataStorage(*dm_context); + auto chunks = DiskValueSpace::writeChunks(opc, in, wb); + ASSERT_EQ(chunks.size(), 1UL); + + const Chunk & chunk = chunks[0]; + ASSERT_EQ(chunk.getRows(), size_t(pk_max - pk_min)); + ASSERT_EQ(chunk.getHandleFirstLast(), HandlePair(pk_min, pk_max - 1)); +} + +TEST_F(DiskValueSpace_test, writeChunks_NonOverlapBlocks) +{ + const Int64 pk_min = 20, pk_max = 40; + const Int64 pk_span = pk_max - pk_min; + BlockInputStreamPtr in = {}; + { + BlocksList blocks; + // First block [20, 30) + Block block = DMTestEnv::prepareSimpleWriteBlock(pk_min, pk_min + pk_span / 2, false); + { + auto col = block.getByName(DMTestEnv::pk_name); + EXPECT_EQ(col.column->getInt(0), pk_min); + EXPECT_EQ(col.column->getInt(col.column->size() - 1), pk_min + pk_span / 2 - 1); + EXPECT_EQ(block.rows(), size_t(10)); + } + blocks.emplace_back(block); + // First block [30, 40) + block = DMTestEnv::prepareSimpleWriteBlock(pk_min + pk_span / 2, pk_max, false); + { + auto col = block.getByName(DMTestEnv::pk_name); + EXPECT_EQ(col.column->getInt(0), pk_min + pk_span / 2); + EXPECT_EQ(col.column->getInt(col.column->size() - 1), pk_max - 1); + EXPECT_EQ(block.rows(), size_t(10)); + } + blocks.emplace_back(block); + in = std::make_shared(std::move(blocks)); + } + + WriteBatch wb; + auto opc = DiskValueSpace::OpContext::createForDataStorage(*dm_context); + auto chunks = DiskValueSpace::writeChunks(opc, in, wb); + ASSERT_EQ(chunks.size(), 2UL); + + { + const Chunk & chunk = chunks[0]; + ASSERT_EQ(chunk.getRows(), size_t(pk_span / 2)); + EXPECT_EQ(chunk.getHandleFirstLast(), HandlePair(pk_min, pk_min + pk_span / 2 - 1)); + } + { + const Chunk & chunk = chunks[1]; + ASSERT_EQ(chunk.getRows(), size_t(pk_span / 2)); + EXPECT_EQ(chunk.getHandleFirstLast(), HandlePair(pk_min + pk_span / 2, pk_max - 1)); + } +} + +TEST_F(DiskValueSpace_test, writeChunks_OverlapBlocks) +{ + BlockInputStreamPtr in = {}; + { + BlocksList blocks; + // First block [20, 30] + Block block = DMTestEnv::prepareSimpleWriteBlock(20, 31, false); + { + auto col = block.getByName(DMTestEnv::pk_name); + EXPECT_EQ(col.column->getInt(0), 20); + EXPECT_EQ(col.column->getInt(col.column->size() - 1), 30); + EXPECT_EQ(block.rows(), size_t(11)); + } + blocks.emplace_back(block); + + // Second block [30, 40), and pk=30 have multiple version + { + // version [100, 110) for pk=30 + Block block_of_multi_versions = DMTestEnv::prepareBlockWithIncreasingTso(30, 100, 110); + // pk [31,40] + Block block_2 = DMTestEnv::prepareSimpleWriteBlock(31, 41, false); + DM::concat(block_of_multi_versions, block_2); + block = block_of_multi_versions; + { + auto col = block.getByName(DMTestEnv::pk_name); + EXPECT_EQ(col.column->getInt(0), 30); + EXPECT_EQ(col.column->getInt(col.column->size() - 1), 40); + EXPECT_EQ(block.rows(), size_t(10 + 10)); + } + } + blocks.emplace_back(block); + + // Third block [40, 50), and pk=40 have multiple version + { + // version [300, 305) for pk=40 + Block block_of_multi_versions = DMTestEnv::prepareBlockWithIncreasingTso(40, 300, 305); + // pk [41,50) + Block block_2 = DMTestEnv::prepareSimpleWriteBlock(41, 50, false); + DM::concat(block_of_multi_versions, block_2); + block = block_of_multi_versions; + { + auto col = block.getByName(DMTestEnv::pk_name); + EXPECT_EQ(col.column->getInt(0), 40); + EXPECT_EQ(col.column->getInt(col.column->size() - 1), 49); + EXPECT_EQ(block.rows(), size_t(5 + 9)); + } + } + blocks.emplace_back(block); + + in = std::make_shared(std::move(blocks)); + } + + WriteBatch wb; + auto opc = DiskValueSpace::OpContext::createForDataStorage(*dm_context); + auto chunks = DiskValueSpace::writeChunks(opc, in, wb); + ASSERT_EQ(chunks.size(), 3UL); + + { + const Chunk & chunk = chunks[0]; + // should be [20, 30], and pk=30 with 11 versions + ASSERT_EQ(chunk.getRows(), size_t(11 + 10)); + EXPECT_EQ(chunk.getHandleFirstLast(), HandlePair(20, 30)); + } + { + const Chunk & chunk = chunks[1]; + // should be [31, 40], and pk=40 with 6 versions + ASSERT_EQ(chunk.getRows(), size_t(9 + 6)); + EXPECT_EQ(chunk.getHandleFirstLast(), HandlePair(31, 40)); + } + { + const Chunk & chunk = chunks[2]; + // should be [41, 50) + ASSERT_EQ(chunk.getRows(), size_t(9)); + EXPECT_EQ(chunk.getHandleFirstLast(), HandlePair(41, 49)); + } +} + +TEST_F(DiskValueSpace_test, writeChunks_OverlapBlocksMerged) +{ + BlockInputStreamPtr in = {}; + { + BlocksList blocks; + // First block [20, 30] + Block block = DMTestEnv::prepareSimpleWriteBlock(20, 31, false); + { + auto col = block.getByName(DMTestEnv::pk_name); + EXPECT_EQ(col.column->getInt(0), 20); + EXPECT_EQ(col.column->getInt(col.column->size() - 1), 30); + EXPECT_EQ(block.rows(), size_t(11)); + } + blocks.emplace_back(block); + + // Second block [30, 31), and pk=30 have multiple version + { + // version [100, 110) for pk=30 + Block block_of_multi_versions = DMTestEnv::prepareBlockWithIncreasingTso(30, 100, 110); + block = block_of_multi_versions; + { + auto col = block.getByName(DMTestEnv::pk_name); + EXPECT_EQ(col.column->getInt(0), 30); + EXPECT_EQ(col.column->getInt(col.column->size() - 1), 30); + EXPECT_EQ(block.rows(), size_t(10)); + } + } + blocks.emplace_back(block); + + // Third block [30, 50), and pk=30 have multiple version + { + // version [300, 305) for pk=30 + Block block_of_multi_versions = DMTestEnv::prepareBlockWithIncreasingTso(30, 300, 305); + // pk [41,50) + Block block_2 = DMTestEnv::prepareSimpleWriteBlock(31, 50, false); + DM::concat(block_of_multi_versions, block_2); + block = block_of_multi_versions; + { + auto col = block.getByName(DMTestEnv::pk_name); + EXPECT_EQ(col.column->getInt(0), 30); + EXPECT_EQ(col.column->getInt(col.column->size() - 1), 49); + EXPECT_EQ(block.rows(), size_t(5 + 19)); + } + } + blocks.emplace_back(block); + + in = std::make_shared(std::move(blocks)); + } + + WriteBatch wb; + auto opc = DiskValueSpace::OpContext::createForDataStorage(*dm_context); + auto chunks = DiskValueSpace::writeChunks(opc, in, wb); + // Second block is merge to the first + ASSERT_EQ(chunks.size(), 2UL); + + { + const Chunk & chunk = chunks[0]; + // should be [20, 30], and pk=30 with 1 + 10 + 5 versions + ASSERT_EQ(chunk.getRows(), size_t(11 + 10 + 5)); + EXPECT_EQ(chunk.getHandleFirstLast(), HandlePair(20, 30)); + } + { + const Chunk & chunk = chunks[1]; + // should be [31, 50) + ASSERT_EQ(chunk.getRows(), size_t(19)); + EXPECT_EQ(chunk.getHandleFirstLast(), HandlePair(31, 49)); + } +} + + } // namespace tests } // namespace DM } // namespace DB From 04ff19c46c1a5953bd330c9b7ec5918db338259a Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 21 Nov 2019 20:34:06 +0800 Subject: [PATCH 05/13] Add option dm_enable_rough_set_filter --- dbms/src/Interpreters/Settings.h | 1 + dbms/src/Server/config.xml | 2 ++ dbms/src/Storages/StorageDeltaMerge.cpp | 45 ++++++++++++++----------- 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 44fdf168c9e..2da14dfc435 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -244,6 +244,7 @@ struct Settings M(SettingUInt64, dm_segment_delta_cache_limit_rows, 1024, "Max rows of cache in segment's delta in DeltaMerge Engine")\ M(SettingUInt64, dm_segment_delta_cache_limit_bytes, 16777216, "Max bytes of cache in segment's delta in DeltaMerge Engine")\ M(SettingUInt64, dm_insert_max_rows, 0, "Max rows of insert blocks when write into DeltaMerge Engine. By default '0' means no limit.")\ + M(SettingBool, dm_enable_rough_set_filter, true, "whether to parse where expression as Rough Set Index filter or not") \ \ M(SettingUInt64, max_rows_in_set, 0, "Maximum size of the set (in number of elements) resulting from the execution of the IN section.") \ M(SettingUInt64, max_bytes_in_set, 0, "Maximum size of the set (in bytes in memory) resulting from the execution of the IN section.") \ diff --git a/dbms/src/Server/config.xml b/dbms/src/Server/config.xml index 8507e1c412a..6f49c2da5bb 100644 --- a/dbms/src/Server/config.xml +++ b/dbms/src/Server/config.xml @@ -316,6 +316,8 @@ /var/lib/clickhouse/kvstore /var/lib/clickhouse/regmap http://127.0.0.1:13579 + + tmt false diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index c2dc4a9e7d4..dfbc2bbcc7d 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -538,28 +538,35 @@ BlockInputStreams StorageDeltaMerge::read( // } DM::RSOperatorPtr rs_operator = DM::EMPTY_FILTER; - if (likely(query_info.dag_query)) + const bool enable_rs_filter = context.getSettingsRef().dm_enable_rough_set_filter; + if (enable_rs_filter) { - /// Query from TiDB / TiSpark - auto create_attr_by_column_id = [this](ColumnID column_id) -> Attr { - const ColumnDefines & defines = this->store->getTableColumns(); - auto iter = std::find_if( - defines.begin(), defines.end(), [column_id](const ColumnDefine & d) -> bool { return d.id == column_id; }); - if (iter != defines.end()) - return Attr{.col_name = iter->name, .col_id = iter->id, .type = iter->type}; - else - // Maybe throw an exception? Or check if `type` is nullptr before creating filter? - return Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}}; - }; - rs_operator = FilterParser::parseDAGQuery(*query_info.dag_query, std::move(create_attr_by_column_id), log); + if (likely(query_info.dag_query)) + { + /// Query from TiDB / TiSpark + auto create_attr_by_column_id = [this](ColumnID column_id) -> Attr { + const ColumnDefines & defines = this->store->getTableColumns(); + auto iter = std::find_if( + defines.begin(), defines.end(), [column_id](const ColumnDefine & d) -> bool { return d.id == column_id; }); + if (iter != defines.end()) + return Attr{.col_name = iter->name, .col_id = iter->id, .type = iter->type}; + else + // Maybe throw an exception? Or check if `type` is nullptr before creating filter? + return Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}}; + }; + rs_operator = FilterParser::parseDAGQuery(*query_info.dag_query, std::move(create_attr_by_column_id), log); + } + else + { + // Query from ch client + rs_operator = FilterParser::parseSelectQuery(select_query, log); + } + if (likely(rs_operator != DM::EMPTY_FILTER)) + LOG_DEBUG(log, "Rough set filter: " << rs_operator->toString()); } else - { - // Query from ch client - rs_operator = FilterParser::parseSelectQuery(select_query, log); - } - if (likely(rs_operator != DM::EMPTY_FILTER)) - LOG_TRACE(log, "RS operator: " << rs_operator->toString()); + LOG_DEBUG(log, "Rough set filter is disabled."); + return store->read(context, context.getSettingsRef(), to_read, ranges, num_streams, /*max_version=*/mvcc_query_info.read_tso, rs_operator, max_block_size); From d5a5bfd5777077f6cc6608d027a6e6a5918c7bd5 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 21 Nov 2019 21:16:26 +0800 Subject: [PATCH 06/13] A quick impl for rough set filter ast parser --- .../DeltaMerge/FilterParser/FilterParser.h | 2 +- .../FilterParser/FilterParser_ast.cpp | 119 +++++++++++------- .../FilterParser/FilterParser_dag.cpp | 1 + dbms/src/Storages/StorageDeltaMerge.cpp | 12 +- 4 files changed, 90 insertions(+), 44 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h index 7a542b25246..9473b229b6c 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h @@ -21,7 +21,7 @@ class FilterParser public: /// From ast. using AttrCreatorByColumnName = std::function; - static RSOperatorPtr parseSelectQuery(const ASTSelectQuery & query, Poco::Logger * log); + static RSOperatorPtr parseSelectQuery(const ASTSelectQuery & query, AttrCreatorByColumnName && creator, Poco::Logger * log); public: /// From dag. diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp index 37579fdfb9e..783aaa87d01 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp @@ -24,22 +24,64 @@ namespace DM { namespace ast { +RSOperatorPtr +parseASTCompareFunction(const ASTFunction * const func, const FilterParser::AttrCreatorByColumnName & creator, Poco::Logger * /*log*/) +{ + if (unlikely(func->arguments->children.size() != 2)) + return createUnsupported( + "", func->name + " with " + DB::toString(func->arguments->children.size()) + " children is not supported", false); + + /// Only support `column` `op` `constant` now. + + Attr attr; + Field value; + UInt32 state = 0x0; + constexpr UInt32 state_has_column = 0x1; + constexpr UInt32 state_has_literal = 0x2; + constexpr UInt32 state_finish = state_has_column | state_has_literal; + for (auto & child : func->arguments->children) + { + if (auto * id = static_cast(child.get()); id != nullptr && id->kind == ASTIdentifier::Column) + { + state |= state_has_column; + const String & col_name = id->name; + attr = creator(col_name); + } + else if (auto * liter = static_cast(child.get()); liter != nullptr) + { + state |= state_has_literal; + value = liter->value; + } + } + + // TODO: null_direction + if (unlikely(state != state_finish)) + return createUnsupported("", "", false); + else if (func->name == "equals") + return createEqual(attr, value); + else if (func->name == "notEquals") + return createNotEqual(attr, value); + else if (func->name == "greater") + return createGreater(attr, value, -1); + else if (func->name == "greaterOrEquals") + return createGreaterEqual(attr, value, -1); + else if (func->name == "less") + return createLess(attr, value, -1); + else if (func->name == "lessOrEquals") + return createLessEqual(attr, value, -1); + return createUnsupported("", "Unknown compare func" + func->name, false); +} -RSOperatorPtr parseRSOperator(const ASTFunction * const func, Poco::Logger * log) +RSOperatorPtr parseASTFunction(const ASTFunction * const func, const FilterParser::AttrCreatorByColumnName & creator, Poco::Logger * log) { assert(func != nullptr); RSOperatorPtr op = EMPTY_FILTER; - - if (func->name == "equals") + if (func->name == "equals" || func->name == "notEquals" // + || func->name == "greater" || func->name == "greaterOrEquals" // + || func->name == "less" || func->name == "lessOrEquals") { - auto * lhs = static_cast(func->arguments->children[0].get()); - assert(lhs != nullptr); - assert(lhs->kind == ASTIdentifier::Kind::Column); - Attr attr{lhs->name, 0, DataTypeFactory::instance().get("Int32")}; - auto * rhs = static_cast(func->arguments->children[1].get()); - assert(rhs != nullptr); - op = createEqual(attr, rhs->value); + op = parseASTCompareFunction(func, creator, log); } else if (func->name == "or" || func->name == "and") { @@ -47,51 +89,44 @@ RSOperatorPtr parseRSOperator(const ASTFunction * const func, Poco::Logger * log for (const auto & child : func->arguments->children) { ASTFunction * sub_func = static_cast(child.get()); - assert(sub_func != nullptr); - children.emplace_back(parseRSOperator(sub_func, log)); + if (sub_func != nullptr) + { + children.emplace_back(parseASTFunction(sub_func, creator, log)); + } + else + { + children.emplace_back(createUnsupported("", "child of logical operator is not function", false)); + } } - op = createOr(children); + if (func->name == "or") + op = createOr(children); + else + op = createAnd(children); } else if (func->name == "not") { - assert(func->arguments->children.size() == 1); - ASTFunction * sub_func = static_cast(func->arguments->children[0].get()); - assert(sub_func != nullptr); - RSOperatorPtr sub_op = parseRSOperator(sub_func, log); - op = createNot(sub_op); + if (unlikely(func->arguments->children.size() != 1)) + op = createUnsupported("", "logical not with " + DB::toString(func->arguments->children.size()) + " children", false); + else + { + if (ASTFunction * sub_func = static_cast(func->arguments->children[0].get()); sub_func != nullptr) + op = createNot(parseASTFunction(sub_func, creator, log)); + else + op = createUnsupported("", "child of logical not is not function", false); + } } #if 0 - else if (func->name == "notEquals") - { - op = createEqual(); - } - else if (func->name == "greater") - { - op = createGreater(); - } - else if (func->name == "greaterOrEquals") - { - op = createGreaterEqual(); - } - else if (func->name == "less") - { - op = createLess(); - } - else if (func->name == "lessOrEquals") + else if (func->name == "in") { - op = createLessEqual(); } - else if (func->name == "in") + else if (func->name == "notIn") { - } else if (func->name == "like") { - } else if (func->name == "notLike") { - } #endif else @@ -106,7 +141,7 @@ RSOperatorPtr parseRSOperator(const ASTFunction * const func, Poco::Logger * log } // namespace ast -RSOperatorPtr FilterParser::parseSelectQuery(const ASTSelectQuery & query, Poco::Logger * log) +RSOperatorPtr FilterParser::parseSelectQuery(const ASTSelectQuery & query, AttrCreatorByColumnName && creator, Poco::Logger * log) { RSOperatorPtr op = EMPTY_FILTER; if (!query.where_expression) @@ -126,7 +161,7 @@ RSOperatorPtr FilterParser::parseSelectQuery(const ASTSelectQuery & query, Poco: std::string expr_tree = ss.str(); LOG_TRACE(log, " where expr: " << expr_tree); - op = ast::parseRSOperator(where, log); + op = ast::parseASTFunction(where, creator, log); return op; } diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_dag.cpp b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_dag.cpp index 3c36c973542..abefa23e5d3 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_dag.cpp +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_dag.cpp @@ -78,6 +78,7 @@ inline RSOperatorPtr parseTiCompareExpr( // expr.DebugString(), tipb::ScalarFuncSig_Name(expr.sig()) + " with state " + DB::toString(state) + " is not supported", false); else { + // TODO: null_direction RSOperatorPtr op; switch (filter_type) { diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index dfbc2bbcc7d..694ca48b65e 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -559,7 +559,17 @@ BlockInputStreams StorageDeltaMerge::read( // else { // Query from ch client - rs_operator = FilterParser::parseSelectQuery(select_query, log); + auto create_attr_by_column_id = [this](const String &col_name) -> Attr { + const ColumnDefines & defines = this->store->getTableColumns(); + auto iter = std::find_if( + defines.begin(), defines.end(), [&col_name](const ColumnDefine & d) -> bool { return d.name == col_name; }); + if (iter != defines.end()) + return Attr{.col_name = iter->name, .col_id = iter->id, .type = iter->type}; + else + // Maybe throw an exception? Or check if `type` is nullptr before creating filter? + return Attr{.col_name = col_name, .col_id = 0, .type = DataTypePtr{}}; + }; + rs_operator = FilterParser::parseSelectQuery(select_query, std::move(create_attr_by_column_id), log); } if (likely(rs_operator != DM::EMPTY_FILTER)) LOG_DEBUG(log, "Rough set filter: " << rs_operator->toString()); From 08243869dcfa3ca702f584d18c0a6462f63cd497 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 22 Nov 2019 15:43:25 +0800 Subject: [PATCH 07/13] Fix bug: compare Date/DateTime to String in rough set filter --- dbms/src/Storages/DeltaMerge/DiskValueSpace.h | 1 + dbms/src/Storages/DeltaMerge/Index/ValueComparison.h | 12 ++++++------ dbms/src/Storages/DeltaMerge/Segment.cpp | 1 - 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DiskValueSpace.h b/dbms/src/Storages/DeltaMerge/DiskValueSpace.h index 58b65675c33..770938de26c 100644 --- a/dbms/src/Storages/DeltaMerge/DiskValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/DiskValueSpace.h @@ -186,6 +186,7 @@ class DiskValueSpace DiskValueSpacePtr tryFlushCache(const OpContext & context, WriteBatch & remove_data_wb, bool force = false); + // TODO: getInputStream can be removed ChunkBlockInputStreamPtr getInputStream(const ColumnDefines & read_columns, const PageReader & page_reader) const; MutableColumnMap cloneCache(); diff --git a/dbms/src/Storages/DeltaMerge/Index/ValueComparison.h b/dbms/src/Storages/DeltaMerge/Index/ValueComparison.h index 9220e695b55..1214e06b4e3 100644 --- a/dbms/src/Storages/DeltaMerge/Index/ValueComparison.h +++ b/dbms/src/Storages/DeltaMerge/Index/ValueComparison.h @@ -268,27 +268,27 @@ struct ValueComparision if (is_date) { - if constexpr (std::is_same_v) + if constexpr (std::is_same_v) { DayNum_t date; ReadBufferFromMemory in(left.data(), left.size()); readDateText(date, in); if (!in.eof()) throw Exception("String is too long for Date: " + left); - res = Op::apply(date, right); + res = Op::apply(date, right); return true; } } else if (is_date_time) { - if constexpr (std::is_same_v) + if constexpr (std::is_same_v) { time_t date_time; ReadBufferFromMemory in(left.data(), left.size()); readDateTimeText(date_time, in); if (!in.eof()) throw Exception("String is too long for DateTime: " + left); - res = Op::apply(date_time, right); + res = Op::apply(date_time, right); return true; } } @@ -307,7 +307,7 @@ struct ValueComparision } else if (is_enum8) { - if constexpr (std::is_same_v) + if constexpr (std::is_same_v) { auto type = static_cast(right_type.get()); auto left_enum_value = type->getValue(left); @@ -317,7 +317,7 @@ struct ValueComparision } else if (is_enum16) { - if constexpr (std::is_same_v) + if constexpr (std::is_same_v) { auto type = static_cast(right_type.get()); auto left_enum_value = type->getValue(left); diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 5ef4a47bf59..8cc5cd2e1f7 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -608,7 +608,6 @@ Segment::ReadInfo Segment::getReadInfo(const DMContext & dm_context, auto new_read_columns = arrangeReadColumns(dm_context.handle_column, read_columns); DeltaValueSpacePtr delta_value_space; - ChunkBlockInputStreamPtr stable_input_stream; DeltaIndexPtr delta_index; auto delta_block = segment_snap.delta->read(new_read_columns, storage_snap.log_reader, 0, segment_snap.delta_rows); From ef3133d763179f090432e44381acf557350625d6 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 22 Nov 2019 15:48:20 +0800 Subject: [PATCH 08/13] Turn off verbose logging info in restoreSegment --- dbms/src/Storages/DeltaMerge/Segment.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 8cc5cd2e1f7..3b99a04184c 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -139,6 +139,8 @@ SegmentPtr Segment::restoreSegment(DMContext & context, PageId segment_id) segment->delta->restore(OpContext::createForLogStorage(context)); segment->stable->restore(OpContext::createForDataStorage(context)); +#if 0 + /// Dump the min-max index info if (unlikely(segment->log->trace())) { for (const auto & chunk : segment->delta->getChunks()) @@ -168,6 +170,7 @@ SegmentPtr Segment::restoreSegment(DMContext & context, PageId segment_id) } } } +#endif return segment; } From c2baffa380552f050ab924b7b25ffdd211f2f7c8 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 22 Nov 2019 16:07:18 +0800 Subject: [PATCH 09/13] Add event tracking --- dbms/src/Common/ProfileEvents.cpp | 2 ++ dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp | 9 ++++++++- dbms/src/Storages/DeltaMerge/Segment.cpp | 3 --- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/dbms/src/Common/ProfileEvents.cpp b/dbms/src/Common/ProfileEvents.cpp index 53edcb41565..d9642007deb 100644 --- a/dbms/src/Common/ProfileEvents.cpp +++ b/dbms/src/Common/ProfileEvents.cpp @@ -197,6 +197,8 @@ M(DMSegmentMergeNS) \ M(DMFlushDeltaCache) \ M(DMFlushDeltaCacheNS) \ + M(DMWriteChunksWriteRows) \ + M(DMWriteChunksCopyRows) \ namespace ProfileEvents diff --git a/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp b/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp index 899087ca6e0..0c361b51927 100644 --- a/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp @@ -14,6 +14,8 @@ namespace ProfileEvents { extern const Event DMFlushDeltaCache; extern const Event DMFlushDeltaCacheNS; +extern const Event DMWriteChunksWriteRows; +extern const Event DMWriteChunksCopyRows; } // namespace ProfileEvents namespace DB @@ -333,10 +335,13 @@ Chunks DiskValueSpace::writeChunks(const OpContext & context, const BlockInputSt cur_col_raw->insertRangeFrom(*next_col_with_name.column, 0, cut_offset); if (cut_offset != next_block_nrows) { + // TODO: we can track the valid range instead of copying data. + size_t nrows_to_copy = next_block_nrows - cut_offset; + ProfileEvents::increment(ProfileEvents::DMWriteChunksCopyRows, nrows_to_copy); // Pop front `cut_offset` elems from `next_col_with_name` assert(next_block_nrows == next_col_with_name.column->size()); MutableColumnPtr cutted_next_column = next_col_with_name.column->cloneEmpty(); - cutted_next_column->insertRangeFrom(*next_col_with_name.column, cut_offset, next_block_nrows - cut_offset); + cutted_next_column->insertRangeFrom(*next_col_with_name.column, cut_offset, nrows_to_copy); next_col_with_name.column = cutted_next_column->getPtr(); } } @@ -344,6 +349,7 @@ Chunks DiskValueSpace::writeChunks(const OpContext & context, const BlockInputSt { // We merge some rows to `cur_block`, make it as a chunk. Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, cur_block); + ProfileEvents::increment(ProfileEvents::DMWriteChunksWriteRows, chunk.getRows()); chunks.emplace_back(std::move(chunk)); cur_block = next_block; } @@ -353,6 +359,7 @@ Chunks DiskValueSpace::writeChunks(const OpContext & context, const BlockInputSt { // There is no pk overlap between `cur_block` and `next_block`, just write `cur_block`. Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, cur_block); + ProfileEvents::increment(ProfileEvents::DMWriteChunksWriteRows, chunk.getRows()); chunks.emplace_back(std::move(chunk)); cur_block = next_block; } diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 3b99a04184c..4ea5c5660e3 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -493,7 +493,6 @@ DiskValueSpacePtr Segment::prepareMergeDelta(DMContext & dm_context, data_stream = std::make_shared>(data_stream, range, 0); data_stream = std::make_shared>(data_stream, handle, min_version); - //TODO: stable chunks should split pk OpContext opc = OpContext::createForDataStorage(dm_context); Chunks new_stable_chunks = DiskValueSpace::writeChunks(opc, data_stream, wbs.data); @@ -883,7 +882,6 @@ SegmentPair Segment::doSplitPhysical(DMContext & dm_context, STABLE_CHUNK_ROWS); my_data = std::make_shared>(my_data, my_range, 0); my_data = std::make_shared>(my_data, handle, min_version); - //TODO: stable chunks should split pk auto tmp = DiskValueSpace::writeChunks(opc, my_data, wbs.data); my_new_stable_chunks.swap(tmp); } @@ -901,7 +899,6 @@ SegmentPair Segment::doSplitPhysical(DMContext & dm_context, STABLE_CHUNK_ROWS); other_data = std::make_shared>(other_data, other_range, 0); other_data = std::make_shared>(other_data, handle, min_version); - //TODO: stable chunks should split pk auto tmp = DiskValueSpace::writeChunks(opc, other_data, wbs.data); other_new_stable_chunks.swap(tmp); } From c03198ba771ff903e9e310899817fb7e387ca4c6 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 22 Nov 2019 19:21:37 +0800 Subject: [PATCH 10/13] Add more debugging info for ast --- .../FilterParser/FilterParser_ast.cpp | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp index 783aaa87d01..740f7e58bfa 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp @@ -24,12 +24,21 @@ namespace DM { namespace ast { + +String astToDebugString(const IAST * const ast) +{ + std::stringstream ss; + ast->dumpTree(ss); + return ss.str(); +} + RSOperatorPtr parseASTCompareFunction(const ASTFunction * const func, const FilterParser::AttrCreatorByColumnName & creator, Poco::Logger * /*log*/) { if (unlikely(func->arguments->children.size() != 2)) - return createUnsupported( - "", func->name + " with " + DB::toString(func->arguments->children.size()) + " children is not supported", false); + return createUnsupported(astToDebugString(func), + func->name + " with " + DB::toString(func->arguments->children.size()) + " children is not supported", + false); /// Only support `column` `op` `constant` now. @@ -56,7 +65,7 @@ parseASTCompareFunction(const ASTFunction * const func, const FilterParser::Attr // TODO: null_direction if (unlikely(state != state_finish)) - return createUnsupported("", "", false); + return createUnsupported(astToDebugString(func), func->name + " with state " + DB::toString(state) + " is not supported", false); else if (func->name == "equals") return createEqual(attr, value); else if (func->name == "notEquals") @@ -69,7 +78,7 @@ parseASTCompareFunction(const ASTFunction * const func, const FilterParser::Attr return createLess(attr, value, -1); else if (func->name == "lessOrEquals") return createLessEqual(attr, value, -1); - return createUnsupported("", "Unknown compare func" + func->name, false); + return createUnsupported(astToDebugString(func), "Unknown compare func: " + func->name, false); } RSOperatorPtr parseASTFunction(const ASTFunction * const func, const FilterParser::AttrCreatorByColumnName & creator, Poco::Logger * log) @@ -95,7 +104,7 @@ RSOperatorPtr parseASTFunction(const ASTFunction * const func, const FilterParse } else { - children.emplace_back(createUnsupported("", "child of logical operator is not function", false)); + children.emplace_back(createUnsupported(astToDebugString(func), "child of logical operator is not function", false)); } } if (func->name == "or") @@ -106,13 +115,14 @@ RSOperatorPtr parseASTFunction(const ASTFunction * const func, const FilterParse else if (func->name == "not") { if (unlikely(func->arguments->children.size() != 1)) - op = createUnsupported("", "logical not with " + DB::toString(func->arguments->children.size()) + " children", false); + op = createUnsupported( + astToDebugString(func), "logical not with " + DB::toString(func->arguments->children.size()) + " children", false); else { if (ASTFunction * sub_func = static_cast(func->arguments->children[0].get()); sub_func != nullptr) op = createNot(parseASTFunction(sub_func, creator, log)); else - op = createUnsupported("", "child of logical not is not function", false); + op = createUnsupported(astToDebugString(func), "child of logical not is not function", false); } } #if 0 @@ -131,9 +141,7 @@ RSOperatorPtr parseASTFunction(const ASTFunction * const func, const FilterParse #endif else { - std::stringstream ss; - func->dumpTree(ss); - op = createUnsupported(ss.str(), "Function " + func->name + " is not supported", false); + op = createUnsupported(astToDebugString(func), "Function " + func->name + " is not supported", false); } return op; @@ -150,16 +158,16 @@ RSOperatorPtr FilterParser::parseSelectQuery(const ASTSelectQuery & query, AttrC const ASTFunction * where = static_cast(query.where_expression.get()); if (!where) { - std::stringstream ss; - query.where_expression->dumpTree(ss); - LOG_WARNING(log, String("Where expression is not ASTFunction, can not parse to rough set index. Expr: ") + ss.str()); + const String debug_string = ast::astToDebugString(query.where_expression.get()); + LOG_WARNING(log, "Where expression is not ASTFunction, can not parse to rough set index. Expr: " << debug_string); return op; } - std::stringstream ss; - where->dumpTree(ss); - std::string expr_tree = ss.str(); - LOG_TRACE(log, " where expr: " << expr_tree); + if (log->trace()) + { + std::string expr_tree = ast::astToDebugString(where); + LOG_TRACE(log, " where expr: " << expr_tree); + } op = ast::parseASTFunction(where, creator, log); From e9548d36f5dd7715915cccab39c0c4f45ad045e8 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 25 Nov 2019 18:35:10 +0800 Subject: [PATCH 11/13] address comments --- dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h | 2 +- dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h b/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h index 06c1609ee7f..23e0eaa62cd 100644 --- a/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h @@ -38,7 +38,7 @@ class ChunkBlockInputStream final : public IBlockInputStream for (const auto & is_skip : skip_chunks) num_skipped += is_skip; - LOG_DEBUG(&Logger::get("ChunkBlockInputStream"), String("Skip: ") << num_skipped << " / " << chunks.size() << " chunks"); + LOG_TRACE(&Logger::get("ChunkBlockInputStream"), String("Skip: ") << num_skipped << " / " << chunks.size() << " chunks"); } String getName() const override { return "Chunk"; } diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h index 9473b229b6c..4881c251ccb 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h @@ -32,8 +32,10 @@ class FilterParser enum RSFilterType { + Unsupported = 0, + // logical - Not = 0, + Not = 1, Or, And, // compare @@ -49,8 +51,6 @@ class FilterParser Like, NotLike, - - Unsupported = 254, }; static std::unordered_map scalar_func_rs_filter_map; From 9b36208e4979bc13a0d0807bc565485456962a4a Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 25 Nov 2019 18:58:55 +0800 Subject: [PATCH 12/13] fix compile error under Mac OSX --- dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 3376ec3c8cd..f88cffc1d3d 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -996,7 +996,7 @@ inline void setColumnDefineDefaultValue(const AlterCommand & command, ColumnDefi time_t time = 0; ReadBufferFromMemory buf(date.data(), date.size()); readDateTimeText(time, buf); - return toField(time); + return toField((Int64)time); } case TypeIndex::Decimal32: { From a533eb3ccc7eca5e1b25a75762892d262af1ef6b Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 25 Nov 2019 20:39:07 +0800 Subject: [PATCH 13/13] use dynamic_cast instead of static_cast --- .../DeltaMerge/FilterParser/FilterParser_ast.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp index 740f7e58bfa..449e92f17c5 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp @@ -50,13 +50,13 @@ parseASTCompareFunction(const ASTFunction * const func, const FilterParser::Attr constexpr UInt32 state_finish = state_has_column | state_has_literal; for (auto & child : func->arguments->children) { - if (auto * id = static_cast(child.get()); id != nullptr && id->kind == ASTIdentifier::Column) + if (auto * id = dynamic_cast(child.get()); id != nullptr && id->kind == ASTIdentifier::Column) { state |= state_has_column; const String & col_name = id->name; attr = creator(col_name); } - else if (auto * liter = static_cast(child.get()); liter != nullptr) + else if (auto * liter = dynamic_cast(child.get()); liter != nullptr) { state |= state_has_literal; value = liter->value; @@ -97,7 +97,7 @@ RSOperatorPtr parseASTFunction(const ASTFunction * const func, const FilterParse RSOperators children; for (const auto & child : func->arguments->children) { - ASTFunction * sub_func = static_cast(child.get()); + ASTFunction * sub_func = dynamic_cast(child.get()); if (sub_func != nullptr) { children.emplace_back(parseASTFunction(sub_func, creator, log)); @@ -119,7 +119,7 @@ RSOperatorPtr parseASTFunction(const ASTFunction * const func, const FilterParse astToDebugString(func), "logical not with " + DB::toString(func->arguments->children.size()) + " children", false); else { - if (ASTFunction * sub_func = static_cast(func->arguments->children[0].get()); sub_func != nullptr) + if (ASTFunction * sub_func = dynamic_cast(func->arguments->children[0].get()); sub_func != nullptr) op = createNot(parseASTFunction(sub_func, creator, log)); else op = createUnsupported(astToDebugString(func), "child of logical not is not function", false); @@ -155,7 +155,7 @@ RSOperatorPtr FilterParser::parseSelectQuery(const ASTSelectQuery & query, AttrC if (!query.where_expression) return op; - const ASTFunction * where = static_cast(query.where_expression.get()); + const ASTFunction * where = dynamic_cast(query.where_expression.get()); if (!where) { const String debug_string = ast::astToDebugString(query.where_expression.get());