diff --git a/dbms/src/DataStreams/TiRemoteBlockInputStream.h b/dbms/src/DataStreams/TiRemoteBlockInputStream.h index 8f7f54ed2b7..0f76b60df6b 100644 --- a/dbms/src/DataStreams/TiRemoteBlockInputStream.h +++ b/dbms/src/DataStreams/TiRemoteBlockInputStream.h @@ -71,21 +71,21 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream { for (const auto & execution_summary : resp.execution_summaries()) { - if (execution_summary.has_executor_id()) + if (likely(execution_summary.has_executor_id())) { - const auto & executor_id = execution_summary.executor_id(); - execution_summaries[index][executor_id].time_processed_ns = execution_summary.time_processed_ns(); - execution_summaries[index][executor_id].num_produced_rows = execution_summary.num_produced_rows(); - execution_summaries[index][executor_id].num_iterations = execution_summary.num_iterations(); - execution_summaries[index][executor_id].concurrency = execution_summary.concurrency(); + auto & remote_execution_summary = execution_summaries[index][execution_summary.executor_id()]; + remote_execution_summary.time_processed_ns = execution_summary.time_processed_ns(); + remote_execution_summary.num_produced_rows = execution_summary.num_produced_rows(); + remote_execution_summary.num_iterations = execution_summary.num_iterations(); + remote_execution_summary.concurrency = execution_summary.concurrency(); } } execution_summaries_inited[index].store(true); } - void addRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index, bool is_streaming_call) + void addRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index) { - if (resp.execution_summaries_size() == 0) + if (unlikely(resp.execution_summaries_size() == 0)) return; if (!execution_summaries_inited[index].load()) @@ -93,10 +93,16 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream initRemoteExecutionSummaries(resp, index); return; } + if constexpr (is_streaming_reader) + throw Exception( + fmt::format( + "There are more than one execution summary packet of index {} in streaming reader, " + "this should not happen", + index)); auto & execution_summaries_map = execution_summaries[index]; for (const auto & execution_summary : resp.execution_summaries()) { - if (execution_summary.has_executor_id()) + if (likely(execution_summary.has_executor_id())) { const auto & executor_id = execution_summary.executor_id(); if (unlikely(execution_summaries_map.find(executor_id) == execution_summaries_map.end())) @@ -104,26 +110,11 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream LOG_WARNING(log, "execution {} not found in execution_summaries, this should not happen", executor_id); continue; } - auto & current_execution_summary = execution_summaries_map[executor_id]; - if (is_streaming_call) - { - current_execution_summary.time_processed_ns - = std::max(current_execution_summary.time_processed_ns, execution_summary.time_processed_ns()); - current_execution_summary.num_produced_rows - = std::max(current_execution_summary.num_produced_rows, execution_summary.num_produced_rows()); - current_execution_summary.num_iterations - = std::max(current_execution_summary.num_iterations, execution_summary.num_iterations()); - current_execution_summary.concurrency - = std::max(current_execution_summary.concurrency, execution_summary.concurrency()); - } - else - { - current_execution_summary.time_processed_ns - = std::max(current_execution_summary.time_processed_ns, execution_summary.time_processed_ns()); - current_execution_summary.num_produced_rows += execution_summary.num_produced_rows(); - current_execution_summary.num_iterations += execution_summary.num_iterations(); - current_execution_summary.concurrency += execution_summary.concurrency(); - } + auto & remote_execution_summary = execution_summaries_map[executor_id]; + remote_execution_summary.time_processed_ns = std::max(remote_execution_summary.time_processed_ns, execution_summary.time_processed_ns()); + remote_execution_summary.num_produced_rows += execution_summary.num_produced_rows(); + remote_execution_summary.num_iterations += execution_summary.num_iterations(); + remote_execution_summary.concurrency += execution_summary.concurrency(); } } } @@ -145,29 +136,21 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream LOG_WARNING(log, "remote reader meets error: {}", result.resp->error().DebugString()); throw Exception(result.resp->error().DebugString()); } - /// only the last response contains execution summaries - if (result.resp != nullptr) - { - if constexpr (is_streaming_reader) - { - addRemoteExecutionSummaries(*result.resp, result.call_index, true); - } - else - { - addRemoteExecutionSummaries(*result.resp, 0, false); - } - } - - const auto & decode_detail = result.decode_detail; - total_rows += decode_detail.rows; size_t index = 0; if constexpr (is_streaming_reader) index = result.call_index; - connection_profile_infos[index].packets += decode_detail.packets; - connection_profile_infos[index].bytes += decode_detail.packet_bytes; + /// only the last response contains execution summaries + if (result.resp != nullptr) + addRemoteExecutionSummaries(*result.resp, index); + const auto & decode_detail = result.decode_detail; + auto & connection_profile_info = connection_profile_infos[index]; + connection_profile_info.packets += decode_detail.packets; + connection_profile_info.bytes += decode_detail.packet_bytes; + + total_rows += decode_detail.rows; LOG_TRACE( log, "recv {} rows from remote for {}, total recv row num: {}", @@ -198,7 +181,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream execution_summaries.resize(source_num); connection_profile_infos.resize(source_num); sample_block = Block(getColumnWithTypeAndName(toNamesAndTypes(remote_reader->getOutputSchema()))); - constexpr size_t squash_rows_limit = 8192; + static constexpr size_t squash_rows_limit = 8192; if constexpr (is_streaming_reader) decoder_ptr = std::make_unique(sample_block, squash_rows_limit); } diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.cpp b/dbms/src/Flash/Coprocessor/DAGDriver.cpp index b1a9bc55e42..0afdb8a7673 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.cpp +++ b/dbms/src/Flash/Coprocessor/DAGDriver.cpp @@ -132,7 +132,7 @@ try streaming_writer, context.getSettingsRef().dag_records_per_chunk, context.getSettingsRef().batch_send_min_limit, - true, + dag_context.collect_execution_summaries, dag_context); dag_output_stream = std::make_shared(streams.in->getHeader(), std::move(response_writer)); copyData(*streams.in, *dag_output_stream); diff --git a/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp index 61bfb95c7c8..0cd64571136 100644 --- a/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp +++ b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp @@ -18,26 +18,15 @@ namespace DB { - -/// delta_mode means when for a streaming call, return the delta execution summary -/// because TiDB is not aware of the streaming call when it handle the execution summaries -/// so we need to "pretend to be a unary call", can be removed if TiDB support streaming -/// call's execution summaries directly void ExecutionSummaryCollector::fillTiExecutionSummary( tipb::ExecutorExecutionSummary * execution_summary, ExecutionSummary & current, - const String & executor_id, - bool delta_mode) + const String & executor_id) const { - auto & prev_stats = previous_execution_stats[executor_id]; - - execution_summary->set_time_processed_ns( - delta_mode ? current.time_processed_ns - prev_stats.time_processed_ns : current.time_processed_ns); - execution_summary->set_num_produced_rows( - delta_mode ? current.num_produced_rows - prev_stats.num_produced_rows : current.num_produced_rows); - execution_summary->set_num_iterations(delta_mode ? current.num_iterations - prev_stats.num_iterations : current.num_iterations); - execution_summary->set_concurrency(delta_mode ? current.concurrency - prev_stats.concurrency : current.concurrency); - prev_stats = current; + execution_summary->set_time_processed_ns(current.time_processed_ns); + execution_summary->set_num_produced_rows(current.num_produced_rows); + execution_summary->set_num_iterations(current.num_iterations); + execution_summary->set_concurrency(current.concurrency); if (dag_context.return_executor_id) execution_summary->set_executor_id(executor_id); } @@ -65,7 +54,7 @@ void mergeRemoteExecuteSummaries( } } -void ExecutionSummaryCollector::addExecuteSummaries(tipb::SelectResponse & response, bool delta_mode) +void ExecutionSummaryCollector::addExecuteSummaries(tipb::SelectResponse & response) { if (!dag_context.collect_execution_summaries) return; @@ -133,7 +122,7 @@ void ExecutionSummaryCollector::addExecuteSummaries(tipb::SelectResponse & respo } current.time_processed_ns += dag_context.compile_time_ns; - fillTiExecutionSummary(response.add_execution_summaries(), current, executor_id, delta_mode); + fillTiExecutionSummary(response.add_execution_summaries(), current, executor_id); }; /// add execution_summary for local executor @@ -161,7 +150,7 @@ void ExecutionSummaryCollector::addExecuteSummaries(tipb::SelectResponse & respo ExecutionSummary merged; for (auto & remote : p.second) merged.merge(remote, false); - fillTiExecutionSummary(response.add_execution_summaries(), merged, p.first, delta_mode); + fillTiExecutionSummary(response.add_execution_summaries(), merged, p.first); } } } diff --git a/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h index c537f3979e0..f40eca91840 100644 --- a/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h +++ b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h @@ -32,18 +32,16 @@ class ExecutionSummaryCollector } } - void addExecuteSummaries(tipb::SelectResponse & response, bool delta_mode); + void addExecuteSummaries(tipb::SelectResponse & response); private: void fillTiExecutionSummary( tipb::ExecutorExecutionSummary * execution_summary, ExecutionSummary & current, - const String & executor_id, - bool delta_mode); + const String & executor_id) const; private: DAGContext & dag_context; - std::unordered_map previous_execution_stats; std::unordered_set local_executors; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp index e3bac59d1a7..7d957d99d13 100644 --- a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp @@ -74,7 +74,7 @@ template void StreamingDAGResponseWriter::sendExecutionSummary() { tipb::SelectResponse response; - summary_collector.addExecuteSummaries(response, /*delta_mode=*/true); + summary_collector.addExecuteSummaries(response); writer->write(response); } diff --git a/dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp index 04d6dc2fc4e..b603d74f1d6 100644 --- a/dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp @@ -77,7 +77,7 @@ void UnaryDAGResponseWriter::finishWrite() encodeChunkToDAGResponse(); } appendWarningsToDAGResponse(); - summary_collector.addExecuteSummaries(*dag_response, false); + summary_collector.addExecuteSummaries(*dag_response); } void UnaryDAGResponseWriter::write(const Block & block) diff --git a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp index 6f66d211340..bf5b9e75efe 100644 --- a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp +++ b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp @@ -47,7 +47,7 @@ template void BroadcastOrPassThroughWriter::sendExecutionSummary() { tipb::SelectResponse response; - summary_collector.addExecuteSummaries(response, /*delta_mode=*/false); + summary_collector.addExecuteSummaries(response); writer->sendExecutionSummary(response); } diff --git a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp index 83d4fa9ab18..c04df4c8e42 100644 --- a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp +++ b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp @@ -58,7 +58,7 @@ template void FineGrainedShuffleWriter::sendExecutionSummary() { tipb::SelectResponse response; - summary_collector.addExecuteSummaries(response, /*delta_mode=*/false); + summary_collector.addExecuteSummaries(response); writer->sendExecutionSummary(response); } diff --git a/dbms/src/Flash/Mpp/HashPartitionWriter.cpp b/dbms/src/Flash/Mpp/HashPartitionWriter.cpp index e5add961e9f..f7da45a09b0 100644 --- a/dbms/src/Flash/Mpp/HashPartitionWriter.cpp +++ b/dbms/src/Flash/Mpp/HashPartitionWriter.cpp @@ -54,7 +54,7 @@ template void HashPartitionWriter::sendExecutionSummary() { tipb::SelectResponse response; - summary_collector.addExecuteSummaries(response, /*delta_mode=*/false); + summary_collector.addExecuteSummaries(response); writer->sendExecutionSummary(response); } diff --git a/dbms/src/Flash/Mpp/newMPPExchangeWriter.h b/dbms/src/Flash/Mpp/newMPPExchangeWriter.h index d2a0c2f173f..9310caabcf5 100644 --- a/dbms/src/Flash/Mpp/newMPPExchangeWriter.h +++ b/dbms/src/Flash/Mpp/newMPPExchangeWriter.h @@ -36,6 +36,7 @@ std::unique_ptr newMPPExchangeWriter( UInt64 fine_grained_shuffle_batch_size) { RUNTIME_CHECK(dag_context.isMPPTask()); + should_send_exec_summary_at_last = dag_context.collect_execution_summaries && should_send_exec_summary_at_last; if (dag_context.isRootMPPTask()) { RUNTIME_CHECK(!enable_fine_grained_shuffle); diff --git a/dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp b/dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp index 4b9faee1707..fcb1be3cea8 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp @@ -76,7 +76,7 @@ void PhysicalExchangeSender::transformImpl(DAGPipeline & pipeline, Context & con exchange_type, context.getSettingsRef().dag_records_per_chunk, context.getSettingsRef().batch_send_min_limit, - stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response + /*should_send_exec_summary_at_last=*/stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response dag_context, fine_grained_shuffle.enable(), fine_grained_shuffle.stream_count,