diff --git a/dbms/src/DataStreams/FilterBlockInputStream.cpp b/dbms/src/DataStreams/FilterBlockInputStream.cpp index 3739b57d82d..a7ecf12d8cb 100644 --- a/dbms/src/DataStreams/FilterBlockInputStream.cpp +++ b/dbms/src/DataStreams/FilterBlockInputStream.cpp @@ -60,7 +60,7 @@ FilterBlockInputStream::FilterBlockInputStream( Block FilterBlockInputStream::getTotals() { - if (IProfilingBlockInputStream * child = dynamic_cast(&*children.back())) + if (auto * child = dynamic_cast(&*children.back())) { totals = child->getTotals(); expression->executeOnTotals(totals); diff --git a/dbms/src/DataStreams/HashJoinBuildBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinBuildBlockInputStream.cpp index 91fd34bfff4..61808b48c50 100644 --- a/dbms/src/DataStreams/HashJoinBuildBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinBuildBlockInputStream.cpp @@ -13,6 +13,7 @@ // limitations under the License. +#include #include namespace DB { @@ -25,4 +26,26 @@ Block HashJoinBuildBlockInputStream::readImpl() return block; } +void HashJoinBuildBlockInputStream::appendInfo(FmtBuffer & buffer) const +{ + static const std::unordered_map join_type_map{ + {ASTTableJoin::Kind::Inner, "Inner"}, + {ASTTableJoin::Kind::Left, "Left"}, + {ASTTableJoin::Kind::Right, "Right"}, + {ASTTableJoin::Kind::Full, "Full"}, + {ASTTableJoin::Kind::Cross, "Cross"}, + {ASTTableJoin::Kind::Comma, "Comma"}, + {ASTTableJoin::Kind::Anti, "Anti"}, + {ASTTableJoin::Kind::LeftSemi, "Left_Semi"}, + {ASTTableJoin::Kind::LeftAnti, "Left_Anti"}, + {ASTTableJoin::Kind::Cross_Left, "Cross_Left"}, + {ASTTableJoin::Kind::Cross_Right, "Cross_Right"}, + {ASTTableJoin::Kind::Cross_Anti, "Cross_Anti"}, + {ASTTableJoin::Kind::Cross_LeftSemi, "Cross_LeftSemi"}, + {ASTTableJoin::Kind::Cross_LeftAnti, "Cross_LeftAnti"}}; + auto join_type_it = join_type_map.find(join->getKind()); + if (join_type_it == join_type_map.end()) + throw TiFlashException("Unknown join type", Errors::Coprocessor::Internal); + buffer.fmtAppend(", join_kind = {}", join_type_it->second); +} } // namespace DB diff --git a/dbms/src/DataStreams/HashJoinBuildBlockInputStream.h b/dbms/src/DataStreams/HashJoinBuildBlockInputStream.h index 57b505f5237..dbfc7f30310 100644 --- a/dbms/src/DataStreams/HashJoinBuildBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinBuildBlockInputStream.h @@ -41,6 +41,7 @@ class HashJoinBuildBlockInputStream : public IProfilingBlockInputStream protected: Block readImpl() override; + void appendInfo(FmtBuffer & buffer) const override; private: JoinPtr join; diff --git a/dbms/src/DataStreams/IBlockInputStream.cpp b/dbms/src/DataStreams/IBlockInputStream.cpp index 57dbe0e6ad0..a05fbf83c96 100644 --- a/dbms/src/DataStreams/IBlockInputStream.cpp +++ b/dbms/src/DataStreams/IBlockInputStream.cpp @@ -77,15 +77,17 @@ size_t IBlockInputStream::checkDepthImpl(size_t max_depth, size_t level) const return res + 1; } - void IBlockInputStream::dumpTree(FmtBuffer & buffer, size_t indent, size_t multiplier) { - // todo append getHeader().dumpStructure() buffer.fmtAppend( - "{}{}{}\n", + "{}{}{}", String(indent, ' '), getName(), multiplier > 1 ? fmt::format(" x {}", multiplier) : ""); + if (!extra_info.empty()) + buffer.fmtAppend(": <{}>", extra_info); + appendInfo(buffer); + buffer.append("\n"); ++indent; /// If the subtree is repeated several times, then we output it once with the multiplier. diff --git a/dbms/src/DataStreams/IBlockInputStream.h b/dbms/src/DataStreams/IBlockInputStream.h index 75fdffb3d29..472eac282d4 100644 --- a/dbms/src/DataStreams/IBlockInputStream.h +++ b/dbms/src/DataStreams/IBlockInputStream.h @@ -135,6 +135,7 @@ class IBlockInputStream : private boost::noncopyable */ void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); } + void setExtraInfo(String info) { extra_info = info; } template void forEachChild(F && f) @@ -176,6 +177,8 @@ class IBlockInputStream : private boost::noncopyable } } + virtual void appendInfo(FmtBuffer & /*buffer*/) const {}; + protected: BlockInputStreams children; mutable std::shared_mutex children_mutex; @@ -188,6 +191,9 @@ class IBlockInputStream : private boost::noncopyable mutable std::mutex tree_id_mutex; mutable String tree_id; + /// The info that hints why the inputStream is needed to run. + String extra_info; + /// Get text with names of this source and the entire subtree, this function should only be called after the /// InputStream tree is constructed. String getTreeID() const; diff --git a/dbms/src/DataStreams/LimitBlockInputStream.cpp b/dbms/src/DataStreams/LimitBlockInputStream.cpp index 81c31fc5d77..4ec6157257c 100644 --- a/dbms/src/DataStreams/LimitBlockInputStream.cpp +++ b/dbms/src/DataStreams/LimitBlockInputStream.cpp @@ -83,4 +83,8 @@ Block LimitBlockInputStream::readImpl() return res; } +void LimitBlockInputStream::appendInfo(FmtBuffer & buffer) const +{ + buffer.fmtAppend(", limit = {}", limit); +} } // namespace DB diff --git a/dbms/src/DataStreams/LimitBlockInputStream.h b/dbms/src/DataStreams/LimitBlockInputStream.h index 21978773daf..e6a7013210b 100644 --- a/dbms/src/DataStreams/LimitBlockInputStream.h +++ b/dbms/src/DataStreams/LimitBlockInputStream.h @@ -43,6 +43,7 @@ class LimitBlockInputStream : public IProfilingBlockInputStream protected: Block readImpl() override; + void appendInfo(FmtBuffer & buffer) const override; private: size_t limit; diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 0975ace963a..e79426f686e 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -287,5 +287,9 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue +#include +#include +#include #include #include #include @@ -275,4 +278,9 @@ void ParallelAggregatingBlockInputStream::execute() no_more_keys); } +void ParallelAggregatingBlockInputStream::appendInfo(FmtBuffer & buffer) const +{ + buffer.fmtAppend(", max_threads: {}, final: {}", max_threads, final ? "true" : "false"); +} + } // namespace DB diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h index 398c3d35bbc..41e61786370 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h @@ -50,7 +50,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream Block getHeader() const override; - virtual void collectNewThreadCountOfThisLevel(int & cnt) override + void collectNewThreadCountOfThisLevel(int & cnt) override { cnt += processor.getMaxThreads(); } @@ -62,6 +62,8 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream } Block readImpl() override; + void appendInfo(FmtBuffer & buffer) const override; + private: const LoggerPtr log; diff --git a/dbms/src/DataStreams/PartialSortingBlockInputStream.cpp b/dbms/src/DataStreams/PartialSortingBlockInputStream.cpp index 30f520fdec3..4069f3818a8 100644 --- a/dbms/src/DataStreams/PartialSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/PartialSortingBlockInputStream.cpp @@ -12,15 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - +#include #include +#include namespace DB { - - Block PartialSortingBlockInputStream::readImpl() { Block res = children.back()->read(); @@ -28,5 +26,8 @@ Block PartialSortingBlockInputStream::readImpl() return res; } - +void PartialSortingBlockInputStream::appendInfo(FmtBuffer & buffer) const +{ + buffer.fmtAppend(": limit = {}", limit); } +} // namespace DB diff --git a/dbms/src/DataStreams/PartialSortingBlockInputStream.h b/dbms/src/DataStreams/PartialSortingBlockInputStream.h index 4a7a62474df..1b2f554ef94 100644 --- a/dbms/src/DataStreams/PartialSortingBlockInputStream.h +++ b/dbms/src/DataStreams/PartialSortingBlockInputStream.h @@ -50,6 +50,7 @@ class PartialSortingBlockInputStream : public IProfilingBlockInputStream protected: Block readImpl() override; + void appendInfo(FmtBuffer & buffer) const override; private: SortDescription description; diff --git a/dbms/src/DataStreams/TiRemoteBlockInputStream.h b/dbms/src/DataStreams/TiRemoteBlockInputStream.h index c1e29617586..f8e313a25be 100644 --- a/dbms/src/DataStreams/TiRemoteBlockInputStream.h +++ b/dbms/src/DataStreams/TiRemoteBlockInputStream.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -60,11 +61,11 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream void initRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index) { - for (auto & execution_summary : resp.execution_summaries()) + for (const auto & execution_summary : resp.execution_summaries()) { if (execution_summary.has_executor_id()) { - auto & executor_id = execution_summary.executor_id(); + const auto & executor_id = execution_summary.executor_id(); execution_summaries[index][executor_id].time_processed_ns = execution_summary.time_processed_ns(); execution_summaries[index][executor_id].num_produced_rows = execution_summary.num_produced_rows(); execution_summaries[index][executor_id].num_iterations = execution_summary.num_iterations(); @@ -84,11 +85,11 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream return; } auto & execution_summaries_map = execution_summaries[index]; - for (auto & execution_summary : resp.execution_summaries()) + for (const auto & execution_summary : resp.execution_summaries()) { if (execution_summary.has_executor_id()) { - auto & executor_id = execution_summary.executor_id(); + const auto & executor_id = execution_summary.executor_id(); if (unlikely(execution_summaries_map.find(executor_id) == execution_summaries_map.end())) { LOG_FMT_WARNING(log, "execution {} not found in execution_summaries, this should not happen", executor_id); @@ -224,12 +225,12 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream bool isStreamingCall() const { return is_streaming_reader; } const std::vector & getConnectionProfileInfos() const { return connection_profile_infos; } - virtual void collectNewThreadCountOfThisLevel(int & cnt) override + void collectNewThreadCountOfThisLevel(int & cnt) override { remote_reader->collectNewThreadCount(cnt); } - virtual void resetNewThreadCountCompute() override + void resetNewThreadCountCompute() override { if (collected) { @@ -239,11 +240,24 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream } protected: - virtual void readSuffixImpl() override + void readSuffixImpl() override { LOG_FMT_DEBUG(log, "finish read {} rows from remote", total_rows); remote_reader->close(); } + + void appendInfo(FmtBuffer & buffer) const override + { + buffer.append(": schema: {"); + buffer.joinStr( + sample_block.begin(), + sample_block.end(), + [](const auto & arg, FmtBuffer & fb) { + fb.fmtAppend("<{}, {}>", arg.name, arg.type->getName()); + }, + ", "); + buffer.append("}"); + } }; using ExchangeReceiverInputStream = TiRemoteBlockInputStream; diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index d031ff103ff..e3e5efdcbc6 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -174,7 +174,7 @@ class DAGContext explicit DAGContext(const tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency) : dag_request(&dag_request_) , initialize_concurrency(concurrency) - , is_mpp_task(false) + , is_mpp_task(true) , is_root_mpp_task(false) , tunnel_set(nullptr) , log(Logger::get(log_identifier)) diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 4d8faffde6c..5fac49faaed 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -258,20 +258,22 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline & size_t join_build_concurrency = settings.join_concurrent_build ? std::min(max_streams, build_pipeline.streams.size()) : 1; /// build side streams - executeExpression(build_pipeline, build_side_prepare_actions); + executeExpression(build_pipeline, build_side_prepare_actions, "append join key and join filters for build side"); // add a HashJoinBuildBlockInputStream to build a shared hash table auto get_concurrency_build_index = JoinInterpreterHelper::concurrencyBuildIndexGenerator(join_build_concurrency); build_pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, join_ptr, get_concurrency_build_index(), log->identifier()); + stream->setExtraInfo( + fmt::format("join build, build_side_root_executor_id = {}", dagContext().getJoinExecuteInfoMap()[query_block.source_name].build_side_root_executor_id)); }); - executeUnion(build_pipeline, max_streams, log, /*ignore_block=*/true); + executeUnion(build_pipeline, max_streams, log, /*ignore_block=*/true, "for join"); right_query.source = build_pipeline.firstStream(); right_query.join = join_ptr; join_ptr->init(right_query.source->getHeader(), join_build_concurrency); /// probe side streams - executeExpression(probe_pipeline, probe_side_prepare_actions); + executeExpression(probe_pipeline, probe_side_prepare_actions, "append join key and join filters for probe side"); NamesAndTypes source_columns; for (const auto & p : probe_pipeline.firstStream()->getHeader()) source_columns.emplace_back(p.name, p.type); @@ -291,12 +293,16 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline & i, not_joined_concurrency, settings.max_block_size); + non_joined_stream->setExtraInfo("add stream with non_joined_data if full_or_right_join"); pipeline.streams_with_non_joined_data.push_back(non_joined_stream); join_execute_info.non_joined_streams.push_back(non_joined_stream); } } for (auto & stream : pipeline.streams) + { stream = std::make_shared(stream, chain.getLastActions(), log->identifier()); + stream->setExtraInfo(fmt::format("join probe, join_executor_id = {}", query_block.source_name)); + } /// add a project to remove all the useless column NamesWithAliases project_cols; @@ -306,7 +312,7 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline & /// it is guaranteed by its children query block project_cols.emplace_back(c.name, c.name); } - executeProject(pipeline, project_cols); + executeProject(pipeline, project_cols, "remove useless column after join"); analyzer = std::make_unique(std::move(join_output_columns), context); } @@ -320,19 +326,22 @@ void DAGQueryBlockInterpreter::recordJoinExecuteInfo(size_t build_side_index, co dagContext().getJoinExecuteInfoMap()[query_block.source_name] = std::move(join_execute_info); } -void DAGQueryBlockInterpreter::executeWhere(DAGPipeline & pipeline, const ExpressionActionsPtr & expr, String & filter_column) +void DAGQueryBlockInterpreter::executeWhere(DAGPipeline & pipeline, const ExpressionActionsPtr & expr, String & filter_column, const String & extra_info) { - pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, expr, filter_column, log->identifier()); }); + pipeline.transform([&](auto & stream) { + stream = std::make_shared(stream, expr, filter_column, log->identifier()); + stream->setExtraInfo(extra_info); + }); } void DAGQueryBlockInterpreter::executeWindow( DAGPipeline & pipeline, WindowDescription & window_description) { - executeExpression(pipeline, window_description.before_window); + executeExpression(pipeline, window_description.before_window, "before window"); /// If there are several streams, we merge them into one - executeUnion(pipeline, max_streams, log); + executeUnion(pipeline, max_streams, log, false, "merge into one for window input"); assert(pipeline.streams.size() == 1); pipeline.firstStream() = std::make_shared(pipeline.firstStream(), window_description, log->identifier()); } @@ -345,7 +354,10 @@ void DAGQueryBlockInterpreter::executeAggregation( AggregateDescriptions & aggregate_descriptions, bool is_final_agg) { - pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, expression_actions_ptr, log->identifier()); }); + pipeline.transform([&](auto & stream) { + stream = std::make_shared(stream, expression_actions_ptr, log->identifier()); + stream->setExtraInfo("before aggregation"); + }); Block before_agg_header = pipeline.firstStream()->getHeader(); @@ -398,11 +410,14 @@ void DAGQueryBlockInterpreter::executeAggregation( } } -void DAGQueryBlockInterpreter::executeExpression(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr) +void DAGQueryBlockInterpreter::executeExpression(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, const String & extra_info) { if (!expressionActionsPtr->getActions().empty()) { - pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, expressionActionsPtr, log->identifier()); }); + pipeline.transform([&](auto & stream) { + stream = std::make_shared(stream, expressionActionsPtr, log->identifier()); + stream->setExtraInfo(extra_info); + }); } } @@ -434,7 +449,7 @@ void DAGQueryBlockInterpreter::orderStreams(DAGPipeline & pipeline, SortDescript }); /// If there are several streams, we merge them into one - executeUnion(pipeline, max_streams, log); + executeUnion(pipeline, max_streams, log, false, "for partial order"); /// Merge the sorted blocks. pipeline.firstStream() = std::make_shared( @@ -465,6 +480,7 @@ void DAGQueryBlockInterpreter::handleExchangeReceiver(DAGPipeline & pipeline) BlockInputStreamPtr stream = std::make_shared(it->second, log->identifier(), query_block.source_name); exchange_receiver_io_input_streams.push_back(stream); stream = std::make_shared(stream, 8192, 0, log->identifier()); + stream->setExtraInfo("squashing after exchange receiver"); pipeline.streams.push_back(stream); } NamesAndTypes source_columns; @@ -511,8 +527,11 @@ void DAGQueryBlockInterpreter::handleProjection(DAGPipeline & pipeline, const ti output_columns.emplace_back(alias, col.type); project_cols.emplace_back(col.name, alias); } - pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, chain.getLastActions(), log->identifier()); }); - executeProject(pipeline, project_cols); + pipeline.transform([&](auto & stream) { + stream = std::make_shared(stream, chain.getLastActions(), log->identifier()); + stream->setExtraInfo("before projection"); + }); + executeProject(pipeline, project_cols, "projection"); analyzer = std::make_unique(std::move(output_columns), context); } @@ -526,7 +545,7 @@ void DAGQueryBlockInterpreter::handleWindow(DAGPipeline & pipeline, const tipb:: DAGExpressionAnalyzer dag_analyzer(input_columns, context); WindowDescription window_description = dag_analyzer.buildWindowDescription(window); executeWindow(pipeline, window_description); - executeExpression(pipeline, window_description.after_window); + executeExpression(pipeline, window_description.after_window, "cast after window"); analyzer = std::make_unique(window_description.after_window_columns, context); } @@ -613,7 +632,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) if (res.before_where) { // execute where - executeWhere(pipeline, res.before_where, res.filter_column_name); + executeWhere(pipeline, res.before_where, res.filter_column_name, "execute where"); recordProfileStreams(pipeline, query_block.selection_name); } @@ -633,12 +652,12 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) if (res.before_having) { // execute having - executeWhere(pipeline, res.before_having, res.having_column_name); + executeWhere(pipeline, res.before_having, res.having_column_name, "execute having"); recordProfileStreams(pipeline, query_block.having_name); } if (res.before_order_and_select) { - executeExpression(pipeline, res.before_order_and_select); + executeExpression(pipeline, res.before_order_and_select, "before order and select"); } if (!res.order_columns.empty()) @@ -649,7 +668,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) } // execute final project action - executeProject(pipeline, final_project); + executeProject(pipeline, final_project, "final projection"); // execute limit if (query_block.limit_or_topn && query_block.limit_or_topn->tp() == tipb::TypeLimit) { @@ -669,12 +688,15 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) } } -void DAGQueryBlockInterpreter::executeProject(DAGPipeline & pipeline, NamesWithAliases & project_cols) +void DAGQueryBlockInterpreter::executeProject(DAGPipeline & pipeline, NamesWithAliases & project_cols, const String & extra_info) { if (project_cols.empty()) return; ExpressionActionsPtr project = generateProjectExpressionActions(pipeline.firstStream(), context, project_cols); - pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, project, log->identifier()); }); + pipeline.transform([&](auto & stream) { + stream = std::make_shared(stream, project, log->identifier()); + stream->setExtraInfo(extra_info); + }); } void DAGQueryBlockInterpreter::executeLimit(DAGPipeline & pipeline) @@ -687,7 +709,7 @@ void DAGQueryBlockInterpreter::executeLimit(DAGPipeline & pipeline) pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, 0, log->identifier(), false); }); if (pipeline.hasMoreThanOneStream()) { - executeUnion(pipeline, max_streams, log); + executeUnion(pipeline, max_streams, log, false, "for partial limit"); pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, 0, log->identifier(), false); }); } } @@ -734,7 +756,7 @@ BlockInputStreams DAGQueryBlockInterpreter::execute() executeImpl(pipeline); if (!pipeline.streams_with_non_joined_data.empty()) { - executeUnion(pipeline, max_streams, log); + executeUnion(pipeline, max_streams, log, false, "final union for non_joined_data"); restorePipelineConcurrency(pipeline); } diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h index 9b95a5c3e93..e68c4f91cee 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h @@ -66,8 +66,8 @@ class DAGQueryBlockInterpreter void handleProjection(DAGPipeline & pipeline, const tipb::Projection & projection); void handleWindow(DAGPipeline & pipeline, const tipb::Window & window); void handleWindowOrder(DAGPipeline & pipeline, const tipb::Sort & window_sort); - void executeWhere(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, String & filter_column); - void executeExpression(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr); + void executeWhere(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, String & filter_column, const String & extra_info = ""); + void executeExpression(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, const String & extra_info = ""); void executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc); void orderStreams(DAGPipeline & pipeline, SortDescription order_descr, Int64 limit); void executeOrder(DAGPipeline & pipeline, const NamesAndTypes & order_columns); @@ -82,7 +82,7 @@ class DAGQueryBlockInterpreter const TiDB::TiDBCollators & collators, AggregateDescriptions & aggregate_descriptions, bool is_final_agg); - void executeProject(DAGPipeline & pipeline, NamesWithAliases & project_cols); + void executeProject(DAGPipeline & pipeline, NamesWithAliases & project_cols, const String & extra_info = ""); void handleExchangeSender(DAGPipeline & pipeline); void handleMockExchangeSender(DAGPipeline & pipeline); diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index f5354994b44..d91b18254f6 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -186,7 +186,7 @@ void setQuotaAndLimitsOnTableScan(Context & context, DAGPipeline & pipeline) QuotaForIntervals & quota = context.getQuota(); pipeline.transform([&](auto & stream) { - if (IProfilingBlockInputStream * p_stream = dynamic_cast(stream.get())) + if (auto * p_stream = dynamic_cast(stream.get())) { p_stream->setLimits(limits); p_stream->setQuota(quota); @@ -373,8 +373,10 @@ void DAGStorageInterpreter::executePushedDownFilter( { auto & stream = pipeline.streams[i]; stream = std::make_shared(stream, before_where, filter_column_name, log->identifier()); + stream->setExtraInfo("push down filter"); // after filter, do project action to keep the schema of local streams and remote streams the same. stream = std::make_shared(stream, project_after_where, log->identifier()); + stream->setExtraInfo("projection after push down filter"); } } @@ -412,6 +414,7 @@ void DAGStorageInterpreter::executeCastAfterTableScan( { auto & stream = pipeline.streams[i++]; stream = std::make_shared(stream, extra_cast, log->identifier()); + stream->setExtraInfo("cast after local tableScan"); } // remote streams if (i < pipeline.streams.size()) @@ -424,6 +427,7 @@ void DAGStorageInterpreter::executeCastAfterTableScan( { auto & stream = pipeline.streams[i++]; stream = std::make_shared(stream, project_for_cop_read, log->identifier()); + stream->setExtraInfo("cast after remote tableScan"); } } } diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp index b7c75c06e67..741aa7b5e26 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp @@ -26,7 +26,7 @@ InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_) , dag(dag_) { const Settings & settings = context.getSettingsRef(); - if (dagContext().isBatchCop() || dagContext().isMPPTask()) + if (dagContext().isBatchCop() || (dagContext().isMPPTask() && !dagContext().isTest())) max_streams = settings.max_threads; else if (dagContext().isTest()) max_streams = dagContext().initialize_concurrency; @@ -85,9 +85,9 @@ BlockIO InterpreterDAG::execute() /// add union to run in parallel if needed if (dagContext().isMPPTask()) /// MPPTask do not need the returned blocks. - executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/true); + executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/true, "for mpp"); else - executeUnion(pipeline, max_streams, dagContext().log); + executeUnion(pipeline, max_streams, dagContext().log, false, "for non mpp"); if (dagContext().hasSubquery()) { const Settings & settings = context.getSettingsRef(); diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index 69060071997..c9810454218 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -34,6 +34,7 @@ void restoreConcurrency( { BlockInputStreamPtr shared_query_block_input_stream = std::make_shared(concurrency * 5, pipeline.firstStream(), log->identifier()); + shared_query_block_input_stream->setExtraInfo("restore concurrency"); pipeline.streams.assign(concurrency, shared_query_block_input_stream); } } @@ -50,9 +51,15 @@ BlockInputStreamPtr combinedNonJoinedDataStream( else if (pipeline.streams_with_non_joined_data.size() > 1) { if (ignore_block) + { ret = std::make_shared(pipeline.streams_with_non_joined_data, nullptr, max_threads, log->identifier()); + ret->setExtraInfo("combine non joined(ignore block)"); + } else + { ret = std::make_shared(pipeline.streams_with_non_joined_data, nullptr, max_threads, log->identifier()); + ret->setExtraInfo("combine non joined"); + } } pipeline.streams_with_non_joined_data.clear(); return ret; @@ -62,7 +69,8 @@ void executeUnion( DAGPipeline & pipeline, size_t max_streams, const LoggerPtr & log, - bool ignore_block) + bool ignore_block, + const String & extra_info) { if (pipeline.streams.size() == 1 && pipeline.streams_with_non_joined_data.empty()) return; @@ -73,6 +81,7 @@ void executeUnion( pipeline.firstStream() = std::make_shared(pipeline.streams, non_joined_data_stream, max_streams, log->identifier()); else pipeline.firstStream() = std::make_shared(pipeline.streams, non_joined_data_stream, max_streams, log->identifier()); + pipeline.firstStream()->setExtraInfo(extra_info); pipeline.streams.resize(1); } else if (non_joined_data_stream != nullptr) diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.h b/dbms/src/Flash/Coprocessor/InterpreterUtils.h index 91e6d483220..5c4d4721d5e 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.h +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.h @@ -37,7 +37,8 @@ void executeUnion( DAGPipeline & pipeline, size_t max_streams, const LoggerPtr & log, - bool ignore_block = false); + bool ignore_block = false, + const String & extra_info = ""); ExpressionActionsPtr generateProjectExpressionActions( const BlockInputStreamPtr & stream, diff --git a/dbms/src/Flash/tests/gtest_interpreter.cpp b/dbms/src/Flash/tests/gtest_interpreter.cpp index 0d07159b8fc..aed9d9e90f9 100644 --- a/dbms/src/Flash/tests/gtest_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_interpreter.cpp @@ -47,18 +47,18 @@ try .build(context); { String expected = R"( -Union - SharedQuery x 10 - Expression - MergeSorting - Union - PartialSorting x 10 - Expression - Filter - SharedQuery - ParallelAggregating - Expression x 10 - Filter +Union: + SharedQuery x 10: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Filter: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Filter: MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -72,18 +72,18 @@ Union { String expected = R"( -Union - SharedQuery x 10 - Limit - Union - Limit x 10 - Expression - Expression - Filter - SharedQuery - ParallelAggregating - Expression x 10 - Filter +Union: + SharedQuery x 10: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 + Expression: + Expression: + Filter: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Filter: MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -100,17 +100,17 @@ try .build(context); { String expected = R"( -Union - Expression x 10 - Expression - Expression - Expression - Expression - Expression - Expression - Expression - Expression - Expression +Union: + Expression x 10: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -122,18 +122,18 @@ Union .build(context); { String expected = R"( -Union - Expression x 10 - Expression - Expression - SharedQuery - Expression - MergeSorting - Union - PartialSorting x 10 - Expression - Expression - Expression +Union: + Expression x 10: + Expression: + Expression: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Expression: + Expression: MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -147,24 +147,24 @@ Union .build(context); { String expected = R"( -Union - Expression x 10 - Expression - Expression - Expression - SharedQuery - ParallelAggregating - Expression x 10 - Expression - Expression - SharedQuery - Expression - MergeSorting - Union - PartialSorting x 10 - Expression - Expression - Expression +Union: + Expression x 10: + Expression: + Expression: + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + Expression: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Expression: + Expression: MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -181,33 +181,33 @@ Union .build(context); { String expected = R"( -Union - SharedQuery x 10 - Limit - Union - Limit x 10 - Expression - Expression - Expression - Expression - Expression - Filter - Expression - Expression - Expression - SharedQuery - ParallelAggregating - Expression x 10 - Expression - Expression - SharedQuery - Expression - MergeSorting - Union - PartialSorting x 10 - Expression - Expression - Expression +Union: + SharedQuery x 10: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 + Expression: + Expression: + Expression: + Expression: + Expression: + Filter: + Expression: + Expression: + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + Expression: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Expression: + Expression: MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -231,24 +231,24 @@ Union { String expected = R"( CreatingSets - Union - HashJoinBuildBlockInputStream x 10 - Expression - Expression + Union: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: MockTableScan - Union x 2 - HashJoinBuildBlockInputStream x 10 - Expression - Expression - Expression - HashJoinProbe - Expression + Union x 2: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + Expression: + HashJoinProbe: + Expression: MockTableScan - Union - Expression x 10 - Expression - HashJoinProbe - Expression + Union: + Expression x 10: + Expression: + HashJoinProbe: + Expression: MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -260,17 +260,17 @@ CreatingSets .build(context); { String expected = R"( -Union - Expression x 10 - Expression - Expression - Expression - Expression - Expression - Expression - Expression - Expression - Expression +Union: + Expression x 10: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: MockExchangeReceiver)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -283,18 +283,18 @@ Union .build(context); { String expected = R"( -Union +Union: MockExchangeSender x 10 - Expression - Expression - Expression - Expression - Expression - Expression - Expression - Expression - Expression - Expression + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: MockExchangeReceiver)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -318,24 +318,24 @@ Union { String expected = R"( CreatingSets - Union - HashJoinBuildBlockInputStream x 10 - Expression - Expression + Union: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: MockExchangeReceiver - Union x 2 - HashJoinBuildBlockInputStream x 10 - Expression - Expression - Expression - HashJoinProbe - Expression + Union x 2: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + Expression: + HashJoinProbe: + Expression: MockExchangeReceiver - Union - Expression x 10 - Expression - HashJoinProbe - Expression + Union: + Expression x 10: + Expression: + HashJoinProbe: + Expression: MockExchangeReceiver)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -360,25 +360,25 @@ CreatingSets { String expected = R"( CreatingSets - Union - HashJoinBuildBlockInputStream x 10 - Expression - Expression + Union: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: MockExchangeReceiver - Union x 2 - HashJoinBuildBlockInputStream x 10 - Expression - Expression - Expression - HashJoinProbe - Expression + Union x 2: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + Expression: + HashJoinProbe: + Expression: MockExchangeReceiver - Union + Union: MockExchangeSender x 10 - Expression - Expression - HashJoinProbe - Expression + Expression: + Expression: + HashJoinProbe: + Expression: MockExchangeReceiver)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } diff --git a/dbms/src/Interpreters/ExpressionActions.cpp b/dbms/src/Interpreters/ExpressionActions.cpp index 45d5293d584..8e75a64427c 100644 --- a/dbms/src/Interpreters/ExpressionActions.cpp +++ b/dbms/src/Interpreters/ExpressionActions.cpp @@ -225,7 +225,7 @@ void ExpressionAction::prepare(Block & sample_block) for (const auto & name : array_joined_columns) { ColumnWithTypeAndName & current = sample_block.getByName(name); - const DataTypeArray * array_type = typeid_cast(&*current.type); + const auto * array_type = typeid_cast(&*current.type); if (!array_type) throw Exception("ARRAY JOIN requires array argument", ErrorCodes::TYPE_MISMATCH); current.type = array_type->getNestedType(); @@ -354,7 +354,7 @@ void ExpressionAction::execute(Block & block) const if (ColumnPtr converted = any_array_ptr->convertToFullColumnIfConst()) any_array_ptr = converted; - const ColumnArray * any_array = typeid_cast(&*any_array_ptr); + const auto * any_array = typeid_cast(&*any_array_ptr); if (!any_array) throw Exception("ARRAY JOIN of not array: " + *array_joined_columns.begin(), ErrorCodes::TYPE_MISMATCH); @@ -461,8 +461,7 @@ void ExpressionAction::executeOnTotals(Block & block) const join->joinTotals(block); } - -std::string ExpressionAction::toString() const +String ExpressionAction::toString() const { std::stringstream ss; switch (type) @@ -496,7 +495,7 @@ std::string ExpressionAction::toString() const case ARRAY_JOIN: ss << (array_join_is_left ? "LEFT " : "") << "ARRAY JOIN "; - for (NameSet::const_iterator it = array_joined_columns.begin(); it != array_joined_columns.end(); ++it) + for (auto it = array_joined_columns.begin(); it != array_joined_columns.end(); ++it) { if (it != array_joined_columns.begin()) ss << ", "; @@ -506,7 +505,7 @@ std::string ExpressionAction::toString() const case JOIN: ss << "JOIN "; - for (NamesAndTypesList::const_iterator it = columns_added_by_join.begin(); it != columns_added_by_join.end(); ++it) + for (auto it = columns_added_by_join.begin(); it != columns_added_by_join.end(); ++it) { if (it != columns_added_by_join.begin()) ss << ", "; @@ -529,7 +528,6 @@ std::string ExpressionAction::toString() const default: throw Exception("Unexpected Action type", ErrorCodes::LOGICAL_ERROR); } - return ss.str(); } @@ -842,9 +840,9 @@ void ExpressionActions::finalize(const Names & output_columns) if (final_columns.empty() && !input_columns.empty()) final_columns.insert(getSmallestColumn(input_columns)); - for (NamesAndTypesList::iterator it = input_columns.begin(); it != input_columns.end();) + for (auto it = input_columns.begin(); it != input_columns.end();) { - NamesAndTypesList::iterator it0 = it; + auto it0 = it; ++it; if (!needed_columns.count(it0->name)) { @@ -931,8 +929,8 @@ std::string ExpressionActions::dumpActions() const ss << "\noutput:\n"; NamesAndTypesList output_columns = sample_block.getNamesAndTypesList(); - for (NamesAndTypesList::const_iterator it = output_columns.begin(); it != output_columns.end(); ++it) - ss << it->name << " " << it->type->getName() << "\n"; + for (const auto & output_column : output_columns) + ss << output_column.name << " " << output_column.type->getName() << "\n"; return ss.str(); } diff --git a/dbms/src/Interpreters/ExpressionActions.h b/dbms/src/Interpreters/ExpressionActions.h index 7ec54a1a8ae..e8fb48f4e3f 100644 --- a/dbms/src/Interpreters/ExpressionActions.h +++ b/dbms/src/Interpreters/ExpressionActions.h @@ -251,7 +251,7 @@ struct ExpressionActionsChain ExpressionActionsPtr actions; Names required_output; - Step(const ExpressionActionsPtr & actions_ = nullptr, const Names & required_output_ = Names()) + explicit Step(const ExpressionActionsPtr & actions_ = nullptr, const Names & required_output_ = Names()) : actions(actions_) , required_output(required_output_) {} diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 6c96e7c22ad..96cfc0a58ae 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -480,7 +480,7 @@ void executeQuery( if (streams.in) { - const ASTQueryWithOutput * ast_query_with_output = dynamic_cast(ast.get()); + const auto * ast_query_with_output = dynamic_cast(ast.get()); WriteBuffer * out_buf = &ostr; std::optional out_file_buf;