diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index 6c6f3d94d60..8f519135f78 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -12,6 +13,8 @@ #include #include #include +#include +#include #include #include #include @@ -446,7 +449,151 @@ BlockInputStreamPtr dbgFuncMockTiDBQuery(Context & context, const ASTs & args) return executeQuery(context, region_id, properties, query_tasks, func_wrap_output_stream); } -void astToPB(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, uint32_t collator_id) +void literalToPB(tipb::Expr * expr, const Field & value, uint32_t collator_id) +{ + std::stringstream ss; + switch (value.getType()) + { + case Field::Types::Which::Null: + { + expr->set_tp(tipb::Null); + auto * ft = expr->mutable_field_type(); + ft->set_tp(TiDB::TypeNull); + ft->set_collate(collator_id); + // Null literal expr doesn't need value. + break; + } + case Field::Types::Which::UInt64: + { + expr->set_tp(tipb::Uint64); + auto * ft = expr->mutable_field_type(); + ft->set_tp(TiDB::TypeLongLong); + ft->set_flag(TiDB::ColumnFlagUnsigned | TiDB::ColumnFlagNotNull); + ft->set_collate(collator_id); + encodeDAGUInt64(value.get(), ss); + break; + } + case Field::Types::Which::Int64: + { + expr->set_tp(tipb::Int64); + auto * ft = expr->mutable_field_type(); + ft->set_tp(TiDB::TypeLongLong); + ft->set_flag(TiDB::ColumnFlagNotNull); + ft->set_collate(collator_id); + encodeDAGInt64(value.get(), ss); + break; + } + case Field::Types::Which::Float64: + { + expr->set_tp(tipb::Float64); + auto * ft = expr->mutable_field_type(); + ft->set_tp(TiDB::TypeFloat); + ft->set_flag(TiDB::ColumnFlagNotNull); + ft->set_collate(collator_id); + encodeDAGFloat64(value.get(), ss); + break; + } + case Field::Types::Which::Decimal32: + case Field::Types::Which::Decimal64: + case Field::Types::Which::Decimal128: + case Field::Types::Which::Decimal256: + { + expr->set_tp(tipb::MysqlDecimal); + auto * ft = expr->mutable_field_type(); + ft->set_tp(TiDB::TypeNewDecimal); + ft->set_flag(TiDB::ColumnFlagNotNull); + ft->set_collate(collator_id); + encodeDAGDecimal(value, ss); + break; + } + case Field::Types::Which::String: + { + expr->set_tp(tipb::String); + auto * ft = expr->mutable_field_type(); + ft->set_tp(TiDB::TypeString); + ft->set_flag(TiDB::ColumnFlagNotNull); + ft->set_collate(collator_id); + // TODO: Align with TiDB. + encodeDAGBytes(value.get(), ss); + break; + } + default: + throw Exception(String("Unsupported literal type: ") + value.getTypeName(), ErrorCodes::LOGICAL_ERROR); + } + expr->set_val(ss.str()); +} + +String getFunctionNameForConstantFolding(tipb::Expr * expr) +{ + // todo support more function for constant folding + switch (expr->sig()) + { + case tipb::ScalarFuncSig::CastStringAsTime: + return "toMyDateTimeOrNull"; + default: + return ""; + } +} + +void foldConstant(tipb::Expr * expr, uint32_t collator_id, const Context & context) +{ + if (expr->tp() == tipb::ScalarFunc) + { + bool all_const = true; + for (auto c : expr->children()) + { + if (!isLiteralExpr(c)) + { + all_const = false; + break; + } + } + if (!all_const) + return; + DataTypes arguments_types; + ColumnsWithTypeAndName argument_columns; + for (auto & c : expr->children()) + { + Field value = decodeLiteral(c); + DataTypePtr flash_type = applyVisitor(FieldToDataType(), value); + DataTypePtr target_type = inferDataType4Literal(c); + ColumnWithTypeAndName column; + column.column = target_type->createColumnConst(1, convertFieldToType(value, *target_type, flash_type.get())); + column.name = exprToString(c, {}) + "_" + target_type->getName(); + column.type = target_type; + arguments_types.emplace_back(target_type); + argument_columns.emplace_back(column); + } + auto func_name = getFunctionNameForConstantFolding(expr); + if (func_name.empty()) + return; + const auto & function_builder_ptr = FunctionFactory::instance().get(func_name, context); + auto function_ptr = function_builder_ptr->build(argument_columns); + if (function_ptr->isSuitableForConstantFolding()) + { + Block block_with_constants(argument_columns); + ColumnNumbers argument_numbers(arguments_types.size()); + for (size_t i = 0, size = arguments_types.size(); i < size; i++) + argument_numbers[i] = i; + size_t result_pos = argument_numbers.size(); + block_with_constants.insert({nullptr, function_ptr->getReturnType(), "result"}); + function_ptr->execute(block_with_constants, argument_numbers, result_pos); + const auto & result_column = block_with_constants.getByPosition(result_pos).column; + if (result_column->isColumnConst()) + { + auto updated_value = (*result_column)[0]; + tipb::FieldType orig_field_type = expr->field_type(); + expr->Clear(); + literalToPB(expr, updated_value, collator_id); + expr->clear_field_type(); + auto * field_type = expr->mutable_field_type(); + (*field_type) = orig_field_type; + } + } + } +} + +void astToPB(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, uint32_t collator_id, const Context & context) { if (ASTIdentifier * id = typeid_cast(ast.get())) { @@ -529,13 +676,13 @@ void astToPB(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, uint32_t co for (const auto & c : tuple_func->arguments->children) { tipb::Expr * child = in_expr->add_children(); - astToPB(input, c, child, collator_id); + astToPB(input, c, child, collator_id, context); } } else { tipb::Expr * child = in_expr->add_children(); - astToPB(input, child_ast, child, collator_id); + astToPB(input, child_ast, child, collator_id, context); } } return; @@ -551,7 +698,7 @@ void astToPB(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, uint32_t co { const auto & child_ast = func->arguments->children[i]; tipb::Expr * child = expr->add_children(); - astToPB(input, child_ast, child, collator_id); + astToPB(input, child_ast, child, collator_id, context); // todo should infer the return type based on all input types if ((it_sig->second == tipb::ScalarFuncSig::IfInt && i == 1) || (it_sig->second != tipb::ScalarFuncSig::IfInt && i == 0)) @@ -569,7 +716,7 @@ void astToPB(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, uint32_t co for (const auto & child_ast : func->arguments->children) { tipb::Expr * child = expr->add_children(); - astToPB(input, child_ast, child, collator_id); + astToPB(input, child_ast, child, collator_id, context); } // for like need to add the third argument tipb::Expr * constant_expr = expr->add_children(); @@ -627,81 +774,13 @@ void astToPB(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, uint32_t co for (const auto & child_ast : func->arguments->children) { tipb::Expr * child = expr->add_children(); - astToPB(input, child_ast, child, collator_id); + astToPB(input, child_ast, child, collator_id, context); } + foldConstant(expr, collator_id, context); } else if (ASTLiteral * lit = typeid_cast(ast.get())) { - std::stringstream ss; - switch (lit->value.getType()) - { - case Field::Types::Which::Null: - { - expr->set_tp(tipb::Null); - auto * ft = expr->mutable_field_type(); - ft->set_tp(TiDB::TypeNull); - ft->set_collate(collator_id); - // Null literal expr doesn't need value. - break; - } - case Field::Types::Which::UInt64: - { - expr->set_tp(tipb::Uint64); - auto * ft = expr->mutable_field_type(); - ft->set_tp(TiDB::TypeLongLong); - ft->set_flag(TiDB::ColumnFlagUnsigned | TiDB::ColumnFlagNotNull); - ft->set_collate(collator_id); - encodeDAGUInt64(lit->value.get(), ss); - break; - } - case Field::Types::Which::Int64: - { - expr->set_tp(tipb::Int64); - auto * ft = expr->mutable_field_type(); - ft->set_tp(TiDB::TypeLongLong); - ft->set_flag(TiDB::ColumnFlagNotNull); - ft->set_collate(collator_id); - encodeDAGInt64(lit->value.get(), ss); - break; - } - case Field::Types::Which::Float64: - { - expr->set_tp(tipb::Float64); - auto * ft = expr->mutable_field_type(); - ft->set_tp(TiDB::TypeFloat); - ft->set_flag(TiDB::ColumnFlagNotNull); - ft->set_collate(collator_id); - encodeDAGFloat64(lit->value.get(), ss); - break; - } - case Field::Types::Which::Decimal32: - case Field::Types::Which::Decimal64: - case Field::Types::Which::Decimal128: - case Field::Types::Which::Decimal256: - { - expr->set_tp(tipb::MysqlDecimal); - auto * ft = expr->mutable_field_type(); - ft->set_tp(TiDB::TypeNewDecimal); - ft->set_flag(TiDB::ColumnFlagNotNull); - ft->set_collate(collator_id); - encodeDAGDecimal(lit->value, ss); - break; - } - case Field::Types::Which::String: - { - expr->set_tp(tipb::String); - auto * ft = expr->mutable_field_type(); - ft->set_tp(TiDB::TypeString); - ft->set_flag(TiDB::ColumnFlagNotNull); - ft->set_collate(collator_id); - // TODO: Align with TiDB. - encodeDAGBytes(lit->value.get(), ss); - break; - } - default: - throw Exception(String("Unsupported literal type: ") + lit->value.getTypeName(), ErrorCodes::LOGICAL_ERROR); - } - expr->set_val(ss.str()); + literalToPB(expr, lit->value, collator_id); } else { @@ -804,7 +883,8 @@ struct Executor { index_++; } - virtual bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info) = 0; + virtual bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) + = 0; virtual void toMPPSubPlan(size_t & executor_index, const DAGProperties & properties, std::unordered_map, std::shared_ptr>> & exchange_map) { @@ -822,7 +902,7 @@ struct ExchangeSender : Executor : Executor(index, "exchange_sender_" + std::to_string(index), output), type(type_), partition_keys(partition_keys_) {} void columnPrune(std::unordered_set &) override { throw Exception("Should not reach here"); } - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info) override + bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override { tipb_executor->set_tp(tipb::ExecType::TypeExchangeSender); tipb_executor->set_executor_id(name); @@ -847,7 +927,7 @@ struct ExchangeSender : Executor meta.AppendToString(meta_string); } auto * child_executor = exchange_sender->mutable_child(); - return children[0]->toTiPBExecutor(child_executor, collator_id, mpp_info); + return children[0]->toTiPBExecutor(child_executor, collator_id, mpp_info, context); } }; @@ -856,7 +936,7 @@ struct ExchangeReceiver : Executor TaskMetas task_metas; ExchangeReceiver(size_t & index, const DAGSchema & output) : Executor(index, "exchange_receiver_" + std::to_string(index), output) {} void columnPrune(std::unordered_set &) override { throw Exception("Should not reach here"); } - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info) override + bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context &) override { tipb_executor->set_tp(tipb::ExecType::TypeExchangeReceiver); tipb_executor->set_executor_id(name); @@ -899,7 +979,7 @@ struct TableScan : public Executor [&](const auto & field) { return used_columns.count(field.first) == 0; }), output_schema.end()); } - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t, const MPPInfo &) override + bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t, const MPPInfo &, const Context &) override { tipb_executor->set_tp(tipb::ExecType::TypeTableScan); tipb_executor->set_executor_id(name); @@ -938,7 +1018,7 @@ struct Selection : public Executor Selection(size_t & index_, const DAGSchema & output_schema_, std::vector && conditions_) : Executor(index_, "selection_" + std::to_string(index_), output_schema_), conditions(std::move(conditions_)) {} - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info) override + bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override { tipb_executor->set_tp(tipb::ExecType::TypeSelection); tipb_executor->set_executor_id(name); @@ -946,10 +1026,10 @@ struct Selection : public Executor for (auto & expr : conditions) { tipb::Expr * cond = sel->add_conditions(); - astToPB(children[0]->output_schema, expr, cond, collator_id); + astToPB(children[0]->output_schema, expr, cond, collator_id, context); } auto * child_executor = sel->mutable_child(); - return children[0]->toTiPBExecutor(child_executor, collator_id, mpp_info); + return children[0]->toTiPBExecutor(child_executor, collator_id, mpp_info, context); } void columnPrune(std::unordered_set & used_columns) override { @@ -968,7 +1048,7 @@ struct TopN : public Executor TopN(size_t & index_, const DAGSchema & output_schema_, std::vector && order_columns_, size_t limit_) : Executor(index_, "topn_" + std::to_string(index_), output_schema_), order_columns(std::move(order_columns_)), limit(limit_) {} - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info) override + bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override { tipb_executor->set_tp(tipb::ExecType::TypeTopN); tipb_executor->set_executor_id(name); @@ -981,11 +1061,11 @@ struct TopN : public Executor tipb::ByItem * by = topn->add_order_by(); by->set_desc(elem->direction < 0); tipb::Expr * expr = by->mutable_expr(); - astToPB(children[0]->output_schema, elem->children[0], expr, collator_id); + astToPB(children[0]->output_schema, elem->children[0], expr, collator_id, context); } topn->set_limit(limit); auto * child_executor = topn->mutable_child(); - return children[0]->toTiPBExecutor(child_executor, collator_id, mpp_info); + return children[0]->toTiPBExecutor(child_executor, collator_id, mpp_info, context); } void columnPrune(std::unordered_set & used_columns) override { @@ -1003,14 +1083,14 @@ struct Limit : public Executor Limit(size_t & index_, const DAGSchema & output_schema_, size_t limit_) : Executor(index_, "limit_" + std::to_string(index_), output_schema_), limit(limit_) {} - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info) override + bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override { tipb_executor->set_tp(tipb::ExecType::TypeLimit); tipb_executor->set_executor_id(name); tipb::Limit * lt = tipb_executor->mutable_limit(); lt->set_limit(limit); auto * child_executor = lt->mutable_child(); - return children[0]->toTiPBExecutor(child_executor, collator_id, mpp_info); + return children[0]->toTiPBExecutor(child_executor, collator_id, mpp_info, context); } void columnPrune(std::unordered_set & used_columns) override { @@ -1036,7 +1116,7 @@ struct Aggregation : public Executor gby_exprs(std::move(gby_exprs_)), is_final_mode(is_final_mode_) {} - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info) override + bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override { tipb_executor->set_tp(tipb::ExecType::TypeAggregation); tipb_executor->set_executor_id(name); @@ -1053,7 +1133,7 @@ struct Aggregation : public Executor for (const auto & arg : func->arguments->children) { tipb::Expr * arg_expr = agg_func->add_children(); - astToPB(input_schema, arg, arg_expr, collator_id); + astToPB(input_schema, arg, arg_expr, collator_id, context); } if (func->name == "count") @@ -1105,11 +1185,11 @@ struct Aggregation : public Executor for (const auto & child : gby_exprs) { tipb::Expr * gby = agg->add_group_by(); - astToPB(input_schema, child, gby, collator_id); + astToPB(input_schema, child, gby, collator_id, context); } auto * child_executor = agg->mutable_child(); - return children[0]->toTiPBExecutor(child_executor, collator_id, mpp_info); + return children[0]->toTiPBExecutor(child_executor, collator_id, mpp_info, context); } void columnPrune(std::unordered_set & used_columns) override { @@ -1191,7 +1271,7 @@ struct Project : public Executor Project(size_t & index_, const DAGSchema & output_schema_, std::vector && exprs_) : Executor(index_, "project_" + std::to_string(index_), output_schema_), exprs(std::move(exprs_)) {} - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info) override + bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override { tipb_executor->set_tp(tipb::ExecType::TypeProjection); tipb_executor->set_executor_id(name); @@ -1215,10 +1295,10 @@ struct Project : public Executor continue; } tipb::Expr * expr = proj->add_exprs(); - astToPB(input_schema, child, expr, collator_id); + astToPB(input_schema, child, expr, collator_id, context); } auto * children_executor = proj->mutable_child(); - return children[0]->toTiPBExecutor(children_executor, collator_id, mpp_info); + return children[0]->toTiPBExecutor(children_executor, collator_id, mpp_info, context); } void columnPrune(std::unordered_set & used_columns) override { @@ -1351,7 +1431,7 @@ struct Join : Executor } } } - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info) override + bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override { tipb_executor->set_tp(tipb::ExecType::TypeJoin); tipb_executor->set_executor_id(name); @@ -1378,9 +1458,9 @@ struct Join : Executor fillJoinKeyAndFieldType(key, children[1]->output_schema, join->add_right_join_keys(), join->add_build_types(), collator_id); } auto * left_child_executor = join->add_children(); - children[0]->toTiPBExecutor(left_child_executor, collator_id, mpp_info); + children[0]->toTiPBExecutor(left_child_executor, collator_id, mpp_info, context); auto * right_child_executor = join->add_children(); - return children[1]->toTiPBExecutor(right_child_executor, collator_id, mpp_info); + return children[1]->toTiPBExecutor(right_child_executor, collator_id, mpp_info, context); } void toMPPSubPlan(size_t & executor_index, const DAGProperties & properties, std::unordered_map, std::shared_ptr>> & exchange_map) override @@ -1828,7 +1908,7 @@ struct QueryFragment task_ids(std::move(task_ids_)) {} - QueryTask toQueryTask(const DAGProperties & properties, MPPInfo & mpp_info) + QueryTask toQueryTask(const DAGProperties & properties, MPPInfo & mpp_info, const Context & context) { std::shared_ptr dag_request_ptr = std::make_shared(); tipb::DAGRequest & dag_request = *dag_request_ptr; @@ -1853,12 +1933,12 @@ struct QueryFragment for (size_t i = 0; i < root_executor->output_schema.size(); i++) dag_request.add_output_offsets(i); auto * root_tipb_executor = dag_request.mutable_root_executor(); - root_executor->toTiPBExecutor(root_tipb_executor, properties.collator, mpp_info); + root_executor->toTiPBExecutor(root_tipb_executor, properties.collator, mpp_info, context); return QueryTask(dag_request_ptr, table_id, root_executor->output_schema, mpp_info.sender_target_task_ids.size() == 0 ? DAG : MPP_DISPATCH, mpp_info.task_id, mpp_info.partition_id, is_top_fragment); } - QueryTasks toQueryTasks(const DAGProperties & properties) + QueryTasks toQueryTasks(const DAGProperties & properties, const Context & context) { QueryTasks ret; if (properties.is_mpp_query) @@ -1867,13 +1947,13 @@ struct QueryFragment { MPPInfo mpp_info( properties.start_ts, partition_id, task_ids[partition_id], sender_target_task_ids, receiver_source_task_ids_map); - ret.push_back(toQueryTask(properties, mpp_info)); + ret.push_back(toQueryTask(properties, mpp_info, context)); } } else { MPPInfo mpp_info(properties.start_ts, -1, -1, {}, {}); - ret.push_back(toQueryTask(properties, mpp_info)); + ret.push_back(toQueryTask(properties, mpp_info, context)); } return ret; } @@ -1961,13 +2041,14 @@ QueryFragments queryPlanToQueryFragments(const DAGProperties & properties, Execu } } -QueryTasks queryPlanToQueryTasks(const DAGProperties & properties, ExecutorPtr root_executor, size_t & executor_index) +QueryTasks queryPlanToQueryTasks( + const DAGProperties & properties, ExecutorPtr root_executor, size_t & executor_index, const Context & context) { QueryFragments fragments = queryPlanToQueryFragments(properties, root_executor, executor_index); QueryTasks tasks; for (auto & fragment : fragments) { - auto t = fragment.toQueryTasks(properties); + auto t = fragment.toQueryTasks(properties, context); tasks.insert(tasks.end(), t.begin(), t.end()); } return tasks; @@ -2196,7 +2277,7 @@ std::tuple compileQuery( used_columns.emplace(schema.first); root_executor->columnPrune(used_columns); - return std::make_tuple(queryPlanToQueryTasks(properties, root_executor, executor_index), func_wrap_output_stream); + return std::make_tuple(queryPlanToQueryTasks(properties, root_executor, executor_index, context), func_wrap_output_stream); } tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, diff --git a/tests/delta-merge-test/query/expr/cast_as_time.test b/tests/delta-merge-test/query/expr/cast_as_time.test index fe955a8bee3..512a550bb60 100644 --- a/tests/delta-merge-test/query/expr/cast_as_time.test +++ b/tests/delta-merge-test/query/expr/cast_as_time.test @@ -17,6 +17,11 @@ │ 1 │ 20201203 │ 2020-12-03 00:00:00 │ 2020-12-03 01:00:00 │ 2020-12-03 01:00:00 │ 2020-12-03 01:00:00 │ 2020-09-15 01:00:00 │ └──────────┴──────────┴──────────────────────┴───────────────────────┴──────────────────────────┴─────────────────────────┴───────────────────────┘ +=> DBGInvoke dag('select * from default.test where e = cast_string_datetime(\'2020-09-15 01:00:00\')', 4,'encode_type:chunk') +┌───test.a─┬─────────test.b─┬─test.c────────────────┬─test.d──────────────┬──────────────test.e─┐ +│ 20201203 │ 20201203010000 │ 20201203010000.000000 │ 2020-12-03 01:00:00 │ 2020-09-15 01:00:00 │ +└──────────┴────────────────┴───────────────────────┴─────────────────────┴─────────────────────┘ + # TODO: # => DBGInvoke dag('select count(1) from default.test group by a, cast_int_date(a), cast_real_date(b), cast_decimal_date(c), cast_string_date(d), cast_time_date(e)', 4,'encode_type:chunk')