Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: mini refactor #5326

Merged
merged 11 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/CoprocessorReader.h>
#include <Flash/Coprocessor/DAGResponseWriter.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Statistics/ConnectionProfileInfo.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -182,21 +183,13 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
, total_rows(0)
, stream_id(stream_id_)
{
// generate sample block
ColumnsWithTypeAndName columns;
for (auto & dag_col : remote_reader->getOutputSchema())
{
auto tp = getDataTypeByColumnInfoForComputingLayer(dag_col.second);
ColumnWithTypeAndName col(tp, dag_col.first);
columns.emplace_back(col);
}
for (size_t i = 0; i < source_num; i++)
for (size_t i = 0; i < source_num; ++i)
{
execution_summaries_inited[i].store(false);
}
execution_summaries.resize(source_num);
connection_profile_infos.resize(source_num);
sample_block = Block(columns);
sample_block = Block(getColumnWithTypeAndName(toNamesAndTypes(remote_reader->getOutputSchema())));
}

Block getHeader() const override { return sample_block; }
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Debug/astToExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ std::unordered_map<String, tipb::ScalarFuncSig> func_name_to_sig({
{"cast_decimal_datetime", tipb::ScalarFuncSig::CastDecimalAsTime},
{"cast_time_datetime", tipb::ScalarFuncSig::CastTimeAsTime},
{"cast_string_datetime", tipb::ScalarFuncSig::CastStringAsTime},
{"concat", tipb::ScalarFuncSig::Concat},
{"round_int", tipb::ScalarFuncSig::RoundInt},
{"round_uint", tipb::ScalarFuncSig::RoundInt},
{"round_dec", tipb::ScalarFuncSig::RoundDec},
Expand Down Expand Up @@ -461,6 +462,14 @@ void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr
ft->set_collate(collator_id);
break;
}
case tipb::ScalarFuncSig::Concat:
{
expr->set_sig(it_sig->second);
auto * ft = expr->mutable_field_type();
ft->set_tp(TiDB::TypeString);
ft->set_collate(collator_id);
break;
}
case tipb::ScalarFuncSig::RoundInt:
case tipb::ScalarFuncSig::RoundWithFracInt:
{
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ extern const int DIVIDED_BY_ZERO;
extern const int INVALID_TIME;
} // namespace ErrorCodes

const String enableFineGrainedShuffleExtraInfo = "enable fine grained shuffle";

bool strictSqlMode(UInt64 sql_mode)
{
return sql_mode & TiDBSQLMode::STRICT_ALL_TABLES || sql_mode & TiDBSQLMode::STRICT_TRANS_TABLES;
Expand Down Expand Up @@ -75,6 +77,11 @@ std::unordered_map<String, BlockInputStreams> & DAGContext::getProfileStreamsMap
return profile_streams_map;
}

void DAGContext::updateFinalConcurrency(size_t cur_streams_size, size_t streams_upper_limit)
{
final_concurrency = std::min(std::max(final_concurrency, cur_streams_size), streams_upper_limit);
}

void DAGContext::initExecutorIdToJoinIdMap()
{
// only mpp task has join executor
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ inline bool enableFineGrainedShuffle(uint64_t stream_count)
return stream_count > 0;
}

extern const String enableFineGrainedShuffleExtraInfo;

/// A context used to track the information that needs to be passed around during DAG planning.
class DAGContext
{
Expand Down Expand Up @@ -308,6 +310,8 @@ class DAGContext
return sql_mode & f;
}

void updateFinalConcurrency(size_t cur_streams_size, size_t streams_upper_limit);

bool isTest() const { return is_test; }
void setColumnsForTest(std::unordered_map<String, ColumnsWithTypeAndName> & columns_for_test_map_) { columns_for_test_map = columns_for_test_map_; }
ColumnsWithTypeAndName columnsForTest(String executor_id);
Expand Down
32 changes: 21 additions & 11 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1130,30 +1130,40 @@ NamesWithAliases DAGExpressionAnalyzer::appendFinalProjectForRootQueryBlock(
const std::vector<Int32> & output_offsets,
const String & column_prefix,
bool keep_session_timezone_info)
{
auto & step = initAndGetLastStep(chain);

NamesWithAliases final_project = buildFinalProjection(step.actions, schema, output_offsets, column_prefix, keep_session_timezone_info);

for (const auto & name : final_project)
{
step.required_output.push_back(name.first);
}
return final_project;
}

NamesWithAliases DAGExpressionAnalyzer::buildFinalProjection(
const ExpressionActionsPtr & actions,
const std::vector<tipb::FieldType> & schema,
const std::vector<Int32> & output_offsets,
const String & column_prefix,
bool keep_session_timezone_info)
{
if (unlikely(output_offsets.empty()))
throw Exception("Root Query block without output_offsets", ErrorCodes::LOGICAL_ERROR);
throw Exception("DAGRequest without output_offsets", ErrorCodes::LOGICAL_ERROR);

bool need_append_timezone_cast = !keep_session_timezone_info && !context.getTimezoneInfo().is_utc_timezone;
auto [need_append_type_cast, need_append_type_cast_vec] = isCastRequiredForRootFinalProjection(schema, output_offsets);
assert(need_append_type_cast_vec.size() == output_offsets.size());

auto & step = initAndGetLastStep(chain);

if (need_append_timezone_cast || need_append_type_cast)
{
// after appendCastForRootFinalProjection, source_columns has been modified.
appendCastForRootFinalProjection(step.actions, schema, output_offsets, need_append_timezone_cast, need_append_type_cast_vec);
appendCastForRootFinalProjection(actions, schema, output_offsets, need_append_timezone_cast, need_append_type_cast_vec);
}

// generate project aliases from source_columns.
NamesWithAliases final_project = genRootFinalProjectAliases(column_prefix, output_offsets);

for (const auto & name : final_project)
{
step.required_output.push_back(name.first);
}
return final_project;
return genRootFinalProjectAliases(column_prefix, output_offsets);
}

String DAGExpressionAnalyzer::alignReturnType(
Expand Down
58 changes: 33 additions & 25 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ class DAGExpressionAnalyzer : private boost::noncopyable
ExpressionActionsChain & chain,
const String & column_prefix) const;

NamesWithAliases genNonRootFinalProjectAliases(const String & column_prefix) const;

// Generate a project action for root DAGQueryBlock,
// to keep the schema of Block and tidb-schema the same.
NamesWithAliases appendFinalProjectForRootQueryBlock(
Expand All @@ -111,6 +113,13 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const String & column_prefix,
bool keep_session_timezone_info);

NamesWithAliases buildFinalProjection(
const ExpressionActionsPtr & actions,
const std::vector<tipb::FieldType> & schema,
const std::vector<Int32> & output_offsets,
const String & column_prefix,
bool keep_session_timezone_info);

String getActions(
const tipb::Expr & expr,
const ExpressionActionsPtr & actions,
Expand Down Expand Up @@ -153,17 +162,38 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const tipb::Window & window,
size_t window_columns_start_index);

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
NamesAndTypes buildOrderColumns(
const ExpressionActionsPtr & actions,
const ::google::protobuf::RepeatedPtrField<tipb::ByItem> & order_by);

String buildFilterColumn(
const ExpressionActionsPtr & actions,
const std::vector<const tipb::Expr *> & conditions);

void buildAggFuncs(
const tipb::Aggregation & aggregation,
const ExpressionActionsPtr & actions,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns);

void buildAggGroupBy(
const google::protobuf::RepeatedPtrField<tipb::Expr> & group_by,
const ExpressionActionsPtr & actions,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns,
Names & aggregation_keys,
std::unordered_set<String> & agg_key_set,
bool group_by_collation_sensitive,
TiDB::TiDBCollators & collators);

void appendCastAfterAgg(
const ExpressionActionsPtr & actions,
const tipb::Aggregation & agg);

#ifndef DBMS_PUBLIC_GTEST
private:
#endif

String buildTupleFunctionForGroupConcat(
const tipb::Expr & expr,
SortDescription & sort_desc,
Expand All @@ -187,22 +217,6 @@ class DAGExpressionAnalyzer : private boost::noncopyable
NamesAndTypes & aggregated_columns,
bool empty_input_as_null);

void buildAggFuncs(
const tipb::Aggregation & aggregation,
const ExpressionActionsPtr & actions,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns);

void buildAggGroupBy(
const google::protobuf::RepeatedPtrField<tipb::Expr> & group_by,
const ExpressionActionsPtr & actions,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns,
Names & aggregation_keys,
std::unordered_set<String> & agg_key_set,
bool group_by_collation_sensitive,
TiDB::TiDBCollators & collators);

void fillArgumentDetail(
const ExpressionActionsPtr & actions,
const tipb::Expr & arg,
Expand Down Expand Up @@ -275,12 +289,6 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const ExpressionActionsPtr & actions,
const String & column_name);

String buildFilterColumn(
const ExpressionActionsPtr & actions,
const std::vector<const tipb::Expr *> & conditions);

NamesWithAliases genNonRootFinalProjectAliases(const String & column_prefix) const;

NamesWithAliases genRootFinalProjectAliases(
const String & column_prefix,
const std::vector<Int32> & output_offsets) const;
Expand Down
Loading