Skip to content

Commit

Permalink
*: mini refactor (#5326)
Browse files Browse the repository at this point in the history
close #4739
  • Loading branch information
SeaRise authored Jul 11, 2022
1 parent 7a717b5 commit 4619605
Show file tree
Hide file tree
Showing 15 changed files with 294 additions and 262 deletions.
13 changes: 3 additions & 10 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/CoprocessorReader.h>
#include <Flash/Coprocessor/DAGResponseWriter.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Statistics/ConnectionProfileInfo.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -182,21 +183,13 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
, total_rows(0)
, stream_id(stream_id_)
{
// generate sample block
ColumnsWithTypeAndName columns;
for (auto & dag_col : remote_reader->getOutputSchema())
{
auto tp = getDataTypeByColumnInfoForComputingLayer(dag_col.second);
ColumnWithTypeAndName col(tp, dag_col.first);
columns.emplace_back(col);
}
for (size_t i = 0; i < source_num; i++)
for (size_t i = 0; i < source_num; ++i)
{
execution_summaries_inited[i].store(false);
}
execution_summaries.resize(source_num);
connection_profile_infos.resize(source_num);
sample_block = Block(columns);
sample_block = Block(getColumnWithTypeAndName(toNamesAndTypes(remote_reader->getOutputSchema())));
}

Block getHeader() const override { return sample_block; }
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Debug/astToExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ std::unordered_map<String, tipb::ScalarFuncSig> func_name_to_sig({
{"cast_decimal_datetime", tipb::ScalarFuncSig::CastDecimalAsTime},
{"cast_time_datetime", tipb::ScalarFuncSig::CastTimeAsTime},
{"cast_string_datetime", tipb::ScalarFuncSig::CastStringAsTime},
{"concat", tipb::ScalarFuncSig::Concat},
{"round_int", tipb::ScalarFuncSig::RoundInt},
{"round_uint", tipb::ScalarFuncSig::RoundInt},
{"round_dec", tipb::ScalarFuncSig::RoundDec},
Expand Down Expand Up @@ -461,6 +462,14 @@ void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr
ft->set_collate(collator_id);
break;
}
case tipb::ScalarFuncSig::Concat:
{
expr->set_sig(it_sig->second);
auto * ft = expr->mutable_field_type();
ft->set_tp(TiDB::TypeString);
ft->set_collate(collator_id);
break;
}
case tipb::ScalarFuncSig::RoundInt:
case tipb::ScalarFuncSig::RoundWithFracInt:
{
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ extern const int DIVIDED_BY_ZERO;
extern const int INVALID_TIME;
} // namespace ErrorCodes

const String enableFineGrainedShuffleExtraInfo = "enable fine grained shuffle";

bool strictSqlMode(UInt64 sql_mode)
{
return sql_mode & TiDBSQLMode::STRICT_ALL_TABLES || sql_mode & TiDBSQLMode::STRICT_TRANS_TABLES;
Expand Down Expand Up @@ -75,6 +77,11 @@ std::unordered_map<String, BlockInputStreams> & DAGContext::getProfileStreamsMap
return profile_streams_map;
}

void DAGContext::updateFinalConcurrency(size_t cur_streams_size, size_t streams_upper_limit)
{
final_concurrency = std::min(std::max(final_concurrency, cur_streams_size), streams_upper_limit);
}

void DAGContext::initExecutorIdToJoinIdMap()
{
// only mpp task has join executor
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ inline bool enableFineGrainedShuffle(uint64_t stream_count)
return stream_count > 0;
}

extern const String enableFineGrainedShuffleExtraInfo;

/// A context used to track the information that needs to be passed around during DAG planning.
class DAGContext
{
Expand Down Expand Up @@ -308,6 +310,8 @@ class DAGContext
return sql_mode & f;
}

void updateFinalConcurrency(size_t cur_streams_size, size_t streams_upper_limit);

bool isTest() const { return is_test; }
void setColumnsForTest(std::unordered_map<String, ColumnsWithTypeAndName> & columns_for_test_map_) { columns_for_test_map = columns_for_test_map_; }
ColumnsWithTypeAndName columnsForTest(String executor_id);
Expand Down
32 changes: 21 additions & 11 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1130,30 +1130,40 @@ NamesWithAliases DAGExpressionAnalyzer::appendFinalProjectForRootQueryBlock(
const std::vector<Int32> & output_offsets,
const String & column_prefix,
bool keep_session_timezone_info)
{
auto & step = initAndGetLastStep(chain);

NamesWithAliases final_project = buildFinalProjection(step.actions, schema, output_offsets, column_prefix, keep_session_timezone_info);

for (const auto & name : final_project)
{
step.required_output.push_back(name.first);
}
return final_project;
}

NamesWithAliases DAGExpressionAnalyzer::buildFinalProjection(
const ExpressionActionsPtr & actions,
const std::vector<tipb::FieldType> & schema,
const std::vector<Int32> & output_offsets,
const String & column_prefix,
bool keep_session_timezone_info)
{
if (unlikely(output_offsets.empty()))
throw Exception("Root Query block without output_offsets", ErrorCodes::LOGICAL_ERROR);
throw Exception("DAGRequest without output_offsets", ErrorCodes::LOGICAL_ERROR);

bool need_append_timezone_cast = !keep_session_timezone_info && !context.getTimezoneInfo().is_utc_timezone;
auto [need_append_type_cast, need_append_type_cast_vec] = isCastRequiredForRootFinalProjection(schema, output_offsets);
assert(need_append_type_cast_vec.size() == output_offsets.size());

auto & step = initAndGetLastStep(chain);

if (need_append_timezone_cast || need_append_type_cast)
{
// after appendCastForRootFinalProjection, source_columns has been modified.
appendCastForRootFinalProjection(step.actions, schema, output_offsets, need_append_timezone_cast, need_append_type_cast_vec);
appendCastForRootFinalProjection(actions, schema, output_offsets, need_append_timezone_cast, need_append_type_cast_vec);
}

// generate project aliases from source_columns.
NamesWithAliases final_project = genRootFinalProjectAliases(column_prefix, output_offsets);

for (const auto & name : final_project)
{
step.required_output.push_back(name.first);
}
return final_project;
return genRootFinalProjectAliases(column_prefix, output_offsets);
}

String DAGExpressionAnalyzer::alignReturnType(
Expand Down
58 changes: 33 additions & 25 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ class DAGExpressionAnalyzer : private boost::noncopyable
ExpressionActionsChain & chain,
const String & column_prefix) const;

NamesWithAliases genNonRootFinalProjectAliases(const String & column_prefix) const;

// Generate a project action for root DAGQueryBlock,
// to keep the schema of Block and tidb-schema the same.
NamesWithAliases appendFinalProjectForRootQueryBlock(
Expand All @@ -111,6 +113,13 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const String & column_prefix,
bool keep_session_timezone_info);

NamesWithAliases buildFinalProjection(
const ExpressionActionsPtr & actions,
const std::vector<tipb::FieldType> & schema,
const std::vector<Int32> & output_offsets,
const String & column_prefix,
bool keep_session_timezone_info);

String getActions(
const tipb::Expr & expr,
const ExpressionActionsPtr & actions,
Expand Down Expand Up @@ -153,17 +162,38 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const tipb::Window & window,
size_t window_columns_start_index);

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
NamesAndTypes buildOrderColumns(
const ExpressionActionsPtr & actions,
const ::google::protobuf::RepeatedPtrField<tipb::ByItem> & order_by);

String buildFilterColumn(
const ExpressionActionsPtr & actions,
const std::vector<const tipb::Expr *> & conditions);

void buildAggFuncs(
const tipb::Aggregation & aggregation,
const ExpressionActionsPtr & actions,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns);

void buildAggGroupBy(
const google::protobuf::RepeatedPtrField<tipb::Expr> & group_by,
const ExpressionActionsPtr & actions,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns,
Names & aggregation_keys,
std::unordered_set<String> & agg_key_set,
bool group_by_collation_sensitive,
TiDB::TiDBCollators & collators);

void appendCastAfterAgg(
const ExpressionActionsPtr & actions,
const tipb::Aggregation & agg);

#ifndef DBMS_PUBLIC_GTEST
private:
#endif

String buildTupleFunctionForGroupConcat(
const tipb::Expr & expr,
SortDescription & sort_desc,
Expand All @@ -187,22 +217,6 @@ class DAGExpressionAnalyzer : private boost::noncopyable
NamesAndTypes & aggregated_columns,
bool empty_input_as_null);

void buildAggFuncs(
const tipb::Aggregation & aggregation,
const ExpressionActionsPtr & actions,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns);

void buildAggGroupBy(
const google::protobuf::RepeatedPtrField<tipb::Expr> & group_by,
const ExpressionActionsPtr & actions,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns,
Names & aggregation_keys,
std::unordered_set<String> & agg_key_set,
bool group_by_collation_sensitive,
TiDB::TiDBCollators & collators);

void fillArgumentDetail(
const ExpressionActionsPtr & actions,
const tipb::Expr & arg,
Expand Down Expand Up @@ -275,12 +289,6 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const ExpressionActionsPtr & actions,
const String & column_name);

String buildFilterColumn(
const ExpressionActionsPtr & actions,
const std::vector<const tipb::Expr *> & conditions);

NamesWithAliases genNonRootFinalProjectAliases(const String & column_prefix) const;

NamesWithAliases genRootFinalProjectAliases(
const String & column_prefix,
const std::vector<Int32> & output_offsets) const;
Expand Down
Loading

0 comments on commit 4619605

Please sign in to comment.