Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLASH-437 Support time zone in coprocessor #259

Merged
merged 8 commits into from
Sep 27, 2019
Merged
11 changes: 11 additions & 0 deletions dbms/src/Common/MyTime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,4 +473,15 @@ void convertTimeZone(UInt64 from_time, UInt64 & to_time, const DateLUTImpl & tim
to_time = to_my_time.toPackedUInt();
}

void convertTimeZoneByOffset(UInt64 from_time, UInt64 & to_time, Int64 offset, const DateLUTImpl & time_zone)
{
MyDateTime from_my_time(from_time);
time_t epoch = time_zone.makeDateTime(
from_my_time.year, from_my_time.month, from_my_time.day, from_my_time.hour, from_my_time.minute, from_my_time.second);
epoch += offset;
MyDateTime to_my_time(time_zone.toYear(epoch), time_zone.toMonth(epoch), time_zone.toDayOfMonth(epoch),
time_zone.toHour(epoch), time_zone.toMinute(epoch), time_zone.toSecond(epoch), from_my_time.micro_second);
to_time = to_my_time.toPackedUInt();
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Common/MyTime.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,6 @@ Field parseMyDateTime(const String & str);

void convertTimeZone(UInt64 from_time, UInt64 & to_time, const DateLUTImpl & time_zone_from, const DateLUTImpl & time_zone_to);

void convertTimeZoneByOffset(UInt64 from_time, UInt64 & to_time, Int64 offset, const DateLUTImpl & time_zone);

} // namespace DB
57 changes: 46 additions & 11 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,27 @@ using DAGColumnInfo = std::pair<String, ColumnInfo>;
using DAGSchema = std::vector<DAGColumnInfo>;
using SchemaFetcher = std::function<TableInfo(const String &, const String &)>;
std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts);
Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts,
Int64 tz_offset, const String & tz_name);
tipb::SelectResponse executeDAGRequest(
Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version);
BlockInputStreamPtr outputDAGResponse(Context & context, const DAGSchema & schema, const tipb::SelectResponse & dag_response);

BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
{
if (args.size() < 1 || args.size() > 2)
throw Exception("Args not matched, should be: query[, region-id]", ErrorCodes::BAD_ARGUMENTS);
if (args.size() < 1 || args.size() > 4)
throw Exception("Args not matched, should be: query[, region-id, tz_offset, tz_name]", ErrorCodes::BAD_ARGUMENTS);

String query = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value);
RegionID region_id = InvalidRegionID;
if (args.size() == 2)
if (args.size() >= 2)
region_id = safeGet<RegionID>(typeid_cast<const ASTLiteral &>(*args[1]).value);
Int64 tz_offset = 0;
String tz_name = "";
if (args.size() >= 3)
tz_offset = get<Int64>(typeid_cast<const ASTLiteral &>(*args[2]).value);
if (args.size() >= 4)
tz_name = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[3]).value);
Timestamp start_ts = context.getTMTContext().getPDClient()->getTS();

auto [table_id, schema, dag_request] = compileQuery(
Expand All @@ -63,7 +70,7 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
throw Exception("Not TMT", ErrorCodes::BAD_ARGUMENTS);
return mmt->getTableInfo();
},
start_ts);
start_ts, tz_offset, tz_name);

RegionPtr region;
if (region_id == InvalidRegionID)
Expand All @@ -86,23 +93,29 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)

BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args)
{
if (args.size() < 2 || args.size() > 3)
throw Exception("Args not matched, should be: query, region-id[, start-ts]", ErrorCodes::BAD_ARGUMENTS);
if (args.size() < 2 || args.size() > 5)
throw Exception("Args not matched, should be: query, region-id[, start-ts, tz_offset, tz_name]", ErrorCodes::BAD_ARGUMENTS);

String query = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value);
RegionID region_id = safeGet<RegionID>(typeid_cast<const ASTLiteral &>(*args[1]).value);
Timestamp start_ts = DEFAULT_MAX_READ_TSO;
if (args.size() == 3)
if (args.size() >= 3)
start_ts = safeGet<Timestamp>(typeid_cast<const ASTLiteral &>(*args[2]).value);
if (start_ts == 0)
start_ts = context.getTMTContext().getPDClient()->getTS();
Int64 tz_offset = 0;
String tz_name = "";
if (args.size() >= 3)
tz_offset = safeGet<Int64>(typeid_cast<const ASTLiteral &>(*args[2]).value);
if (args.size() >= 4)
tz_name = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[3]).value);

auto [table_id, schema, dag_request] = compileQuery(
context, query,
[&](const String & database_name, const String & table_name) {
return MockTiDB::instance().getTableByName(database_name, table_name)->table_info;
},
start_ts);
start_ts, tz_offset, tz_name);
std::ignore = table_id;

RegionPtr region = context.getTMTContext().getKVStore()->getRegion(region_id);
Expand Down Expand Up @@ -170,6 +183,14 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un
ft->set_tp(TiDB::TypeTiny);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else if (func_name_lowercase == "greater")
{
expr->set_sig(tipb::ScalarFuncSig::GTInt);
auto * ft = expr->mutable_field_type();
// TODO: TiDB will infer Int64.
ft->set_tp(TiDB::TypeTiny);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else
{
throw Exception("Unsupported function: " + func_name_lowercase, ErrorCodes::LOGICAL_ERROR);
Expand Down Expand Up @@ -239,10 +260,13 @@ void compileFilter(const DAGSchema & input, ASTPtr ast, tipb::Selection * filter
}

std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts)
Context & context, const String & query, SchemaFetcher schema_fetcher,
Timestamp start_ts, Int64 tz_offset, const String & tz_name)
{
DAGSchema schema;
tipb::DAGRequest dag_request;
dag_request.set_time_zone_name(tz_name);
dag_request.set_time_zone_offset(tz_offset);

dag_request.set_start_ts(start_ts);

Expand Down Expand Up @@ -291,8 +315,11 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
ci.tp = column_info.tp;
ci.flag = column_info.flag;
ci.flen = column_info.flen;
ci.decimal = column_info.flen;
ci.decimal = column_info.decimal;
ci.elems = column_info.elems;
// a hack to test timestamp type in mock test
if (column_info.tp == TiDB::TypeDatetime && ci.decimal == 5)
ci.tp = TiDB::TypeTimestamp;
ts_output.emplace_back(std::make_pair(column_info.name, std::move(ci)));
}
executor_ctx_map.emplace(ts_exec, ExecutorCtx{nullptr, std::move(ts_output), std::unordered_map<String, tipb::Expr *>{}});
Expand Down Expand Up @@ -430,6 +457,14 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagUnsigned | TiDB::ColumnFlagNotNull);
}
else if (func->name == "max")
{
agg_func->set_tp(tipb::Max);
if (agg_func->children_size() != 1)
throw Exception("udaf max only accept 1 argument");
auto ft = agg_func->mutable_field_type();
ft->set_tp(agg_func->children(0).field_type().tp());
}
// TODO: Other agg func.
else
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ catch (const LockException & e)
}
catch (const Exception & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.displayText());
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.getStackTrace().toString());
recordError(e.code(), e.message());
}
catch (const std::exception & e)
Expand Down
131 changes: 119 additions & 12 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <DataTypes/DataTypeSet.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/FieldToDataType.h>
#include <Flash/Coprocessor/DAGCodec.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -44,7 +45,7 @@ static String genFuncString(const String & func_name, const Names & argument_nam
return ss.str();
}

DAGExpressionAnalyzer::DAGExpressionAnalyzer(const NamesAndTypesList & source_columns_, const Context & context_)
DAGExpressionAnalyzer::DAGExpressionAnalyzer(const std::vector<NameAndTypePair> && source_columns_, const Context & context_)
: source_columns(source_columns_),
context(context_),
after_agg(false),
Expand Down Expand Up @@ -177,28 +178,129 @@ void DAGExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, const
}
}

const NamesAndTypesList & DAGExpressionAnalyzer::getCurrentInputColumns() { return after_agg ? aggregated_columns : source_columns; }
const std::vector<NameAndTypePair> & DAGExpressionAnalyzer::getCurrentInputColumns()
{
return after_agg ? aggregated_columns : source_columns;
}

void DAGExpressionAnalyzer::appendFinalProject(ExpressionActionsChain & chain, const NamesWithAliases & final_project)
{
initChain(chain, getCurrentInputColumns());
for (auto name : final_project)
for (const auto & name : final_project)
{
chain.steps.back().required_output.push_back(name.first);
}
}

void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & aggregation)
void constructTZExpr(tipb::Expr & tz_expr, const tipb::DAGRequest & rqst, bool from_utc)
{
if (rqst.has_time_zone_name() && rqst.time_zone_name().length() > 0)
{
tz_expr.set_tp(tipb::ExprType::String);
tz_expr.set_val(rqst.time_zone_name());
auto * field_type = tz_expr.mutable_field_type();
field_type->set_tp(TiDB::TypeString);
field_type->set_flag(TiDB::ColumnFlagNotNull);
}
else
{
tz_expr.set_tp(tipb::ExprType::Int64);
std::stringstream ss;
encodeDAGInt64(from_utc ? rqst.time_zone_offset() : -rqst.time_zone_offset(), ss);
tz_expr.set_val(ss.str());
auto * field_type = tz_expr.mutable_field_type();
field_type->set_tp(TiDB::TypeLongLong);
field_type->set_flag(TiDB::ColumnFlagNotNull);
}
}

bool hasMeaningfulTZInfo(const tipb::DAGRequest &rqst)
{
if (rqst.has_time_zone_name() && rqst.time_zone_name().length() > 0)
return rqst.time_zone_name() != "UTC";
if (rqst.has_time_zone_offset())
return rqst.has_time_zone_offset() != 0;
return false;
}

String DAGExpressionAnalyzer::appendTimeZoneCast(
const String & tz_col, const String & ts_col, const String & func_name, ExpressionActionsPtr & actions)
{
Names cast_argument_names;
cast_argument_names.push_back(ts_col);
cast_argument_names.push_back(tz_col);
String cast_expr_name = applyFunction(func_name, cast_argument_names, actions);
return cast_expr_name;
}

// add timezone cast after table scan, this is used for session level timezone support
// the basic idea of supporting session level timezone is that:
// 1. for every timestamp column used in the dag request, after reading it from table scan, we add
// cast function to convert its timezone to the timezone specified in DAG request
// 2. for every timestamp column that will be returned to TiDB, we add cast function to convert its
// timezone to UTC
// for timestamp columns without any transformation or calculation(e.g. select ts_col from table),
// this will introduce two useless casts, in order to avoid these redundant cast, when cast the ts
// column to the columns with session-level timezone info, the original ts columns with UTC
// timezone are still kept
// for DAG request that does not contain agg, the final project will select the ts column with UTC
// timezone, which is exactly what TiDB want
// for DAG request that contains agg, any ts column after agg has session-level timezone info(since the ts
// column with UTC timezone will never be used in during agg), all the column with ts datatype will
// convert back to UTC timezone
bool DAGExpressionAnalyzer::appendTimeZoneCastsAfterTS(
ExpressionActionsChain &chain, std::vector<bool> is_ts_column, const tipb::DAGRequest &rqst)
{
if (!hasMeaningfulTZInfo(rqst))
return false;

bool ret = false;
initChain(chain, getCurrentInputColumns());
ExpressionActionsPtr actions = chain.getLastActions();
tipb::Expr tz_expr;
constructTZExpr(tz_expr, rqst, true);
String tz_col;
String func_name
= rqst.has_time_zone_name() && rqst.time_zone_name().length() > 0 ? "ConvertTimeZoneFromUTC" : "ConvertTimeZoneByOffset";
for (size_t i = 0; i < is_ts_column.size(); i++)
{
if (is_ts_column[i])
{
if (tz_col.length() == 0)
tz_col = getActions(tz_expr, actions);
String casted_name = appendTimeZoneCast(tz_col, source_columns[i].name, func_name, actions);
source_columns.emplace_back(source_columns[i].name, source_columns[i].type);
source_columns[i].name = casted_name;
ret = true;
}
}
return ret;
}

void DAGExpressionAnalyzer::appendAggSelect(
ExpressionActionsChain & chain, const tipb::Aggregation & aggregation, const tipb::DAGRequest & rqst)
{
initChain(chain, getCurrentInputColumns());
bool need_update_aggregated_columns = false;
NamesAndTypesList updated_aggregated_columns;
ExpressionActionsChain::Step step = chain.steps.back();
auto agg_col_names = aggregated_columns.getNames();
bool need_append_timezone_cast = hasMeaningfulTZInfo(rqst);
tipb::Expr tz_expr;
if (need_append_timezone_cast)
constructTZExpr(tz_expr, rqst, false);
String tz_col;
String tz_cast_func_name
= rqst.has_time_zone_name() && rqst.time_zone_name().length() > 0 ? "ConvertTimeZoneToUTC" : "ConvertTimeZoneByOffset";
for (Int32 i = 0; i < aggregation.agg_func_size(); i++)
{
String & name = agg_col_names[i];
String & name = aggregated_columns[i].name;
String updated_name = appendCastIfNeeded(aggregation.agg_func(i), step.actions, name);
if (need_append_timezone_cast && aggregation.agg_func(i).field_type().tp() == TiDB::TypeTimestamp)
{
if (tz_col.length() == 0)
tz_col = getActions(tz_expr, step.actions);
updated_name = appendTimeZoneCast(tz_col, updated_name, tz_cast_func_name, step.actions);
}
if (name != updated_name)
{
need_update_aggregated_columns = true;
Expand All @@ -208,14 +310,20 @@ void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, cons
}
else
{
updated_aggregated_columns.emplace_back(name, aggregated_columns.getTypes()[i]);
updated_aggregated_columns.emplace_back(name, aggregated_columns[i].type);
step.required_output.push_back(name);
}
}
for (Int32 i = 0; i < aggregation.group_by_size(); i++)
{
String & name = agg_col_names[i + aggregation.agg_func_size()];
String & name = aggregated_columns[i + aggregation.agg_func_size()].name;
String updated_name = appendCastIfNeeded(aggregation.group_by(i), step.actions, name);
if (need_append_timezone_cast && aggregation.group_by(i).field_type().tp() == TiDB::TypeTimestamp)
{
if (tz_col.length() == 0)
tz_col = getActions(tz_expr, step.actions);
updated_name = appendTimeZoneCast(tz_col, updated_name, tz_cast_func_name, step.actions);
}
if (name != updated_name)
{
need_update_aggregated_columns = true;
Expand All @@ -225,7 +333,7 @@ void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, cons
}
else
{
updated_aggregated_columns.emplace_back(name, aggregated_columns.getTypes()[i]);
updated_aggregated_columns.emplace_back(name, aggregated_columns[i].type);
step.required_output.push_back(name);
}
}
Expand Down Expand Up @@ -263,11 +371,10 @@ String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, Expres
// first construct the second argument
tipb::Expr type_expr;
type_expr.set_tp(tipb::ExprType::String);
std::stringstream ss;
type_expr.set_val(expected_type->getName());
auto * type_field_type = type_expr.mutable_field_type();
type_field_type->set_tp(0xfe);
type_field_type->set_flag(1);
type_field_type->set_tp(TiDB::TypeString);
type_field_type->set_flag(TiDB::ColumnFlagNotNull);
getActions(type_expr, actions);

Names cast_argument_names;
Expand Down
Loading