From 4156434e11314128b37f51d988418c73dcdebd86 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 7 Nov 2022 18:19:04 +0800 Subject: [PATCH 1/4] remove delta mode --- .../DataStreams/TiRemoteBlockInputStream.h | 90 +++++-------------- .../Coprocessor/ExecutionSummaryCollector.cpp | 27 ++---- .../Coprocessor/ExecutionSummaryCollector.h | 6 +- .../StreamingDAGResponseWriter.cpp | 2 +- .../Coprocessor/UnaryDAGResponseWriter.cpp | 2 +- .../Mpp/BroadcastOrPassThroughWriter.cpp | 2 +- .../Flash/Mpp/FineGrainedShuffleWriter.cpp | 2 +- dbms/src/Flash/Mpp/HashPartitionWriter.cpp | 2 +- 8 files changed, 36 insertions(+), 97 deletions(-) diff --git a/dbms/src/DataStreams/TiRemoteBlockInputStream.h b/dbms/src/DataStreams/TiRemoteBlockInputStream.h index 8f7f54ed2b7..b1531ac9657 100644 --- a/dbms/src/DataStreams/TiRemoteBlockInputStream.h +++ b/dbms/src/DataStreams/TiRemoteBlockInputStream.h @@ -67,65 +67,25 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream std::unique_ptr decoder_ptr; - void initRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index) + void addRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index) { - for (const auto & execution_summary : resp.execution_summaries()) - { - if (execution_summary.has_executor_id()) - { - const auto & executor_id = execution_summary.executor_id(); - execution_summaries[index][executor_id].time_processed_ns = execution_summary.time_processed_ns(); - execution_summaries[index][executor_id].num_produced_rows = execution_summary.num_produced_rows(); - execution_summaries[index][executor_id].num_iterations = execution_summary.num_iterations(); - execution_summaries[index][executor_id].concurrency = execution_summary.concurrency(); - } - } - execution_summaries_inited[index].store(true); - } - - void addRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index, bool is_streaming_call) - { - if (resp.execution_summaries_size() == 0) + if (unlikely(resp.execution_summaries_size() == 0)) return; - if (!execution_summaries_inited[index].load()) - { - initRemoteExecutionSummaries(resp, index); - return; - } - auto & execution_summaries_map = execution_summaries[index]; + // There will only be one summary packet for one index. + RUNTIME_CHECK(!execution_summaries_inited[index].load()); for (const auto & execution_summary : resp.execution_summaries()) { - if (execution_summary.has_executor_id()) + if (likely(execution_summary.has_executor_id())) { - const auto & executor_id = execution_summary.executor_id(); - if (unlikely(execution_summaries_map.find(executor_id) == execution_summaries_map.end())) - { - LOG_WARNING(log, "execution {} not found in execution_summaries, this should not happen", executor_id); - continue; - } - auto & current_execution_summary = execution_summaries_map[executor_id]; - if (is_streaming_call) - { - current_execution_summary.time_processed_ns - = std::max(current_execution_summary.time_processed_ns, execution_summary.time_processed_ns()); - current_execution_summary.num_produced_rows - = std::max(current_execution_summary.num_produced_rows, execution_summary.num_produced_rows()); - current_execution_summary.num_iterations - = std::max(current_execution_summary.num_iterations, execution_summary.num_iterations()); - current_execution_summary.concurrency - = std::max(current_execution_summary.concurrency, execution_summary.concurrency()); - } - else - { - current_execution_summary.time_processed_ns - = std::max(current_execution_summary.time_processed_ns, execution_summary.time_processed_ns()); - current_execution_summary.num_produced_rows += execution_summary.num_produced_rows(); - current_execution_summary.num_iterations += execution_summary.num_iterations(); - current_execution_summary.concurrency += execution_summary.concurrency(); - } + auto & remote_execution_summary = execution_summaries[index][execution_summary.executor_id()]; + remote_execution_summary.time_processed_ns = execution_summary.time_processed_ns(); + remote_execution_summary.num_produced_rows = execution_summary.num_produced_rows(); + remote_execution_summary.num_iterations = execution_summary.num_iterations(); + remote_execution_summary.concurrency = execution_summary.concurrency(); } } + execution_summaries_inited[index].store(true); } bool fetchRemoteResult() @@ -145,29 +105,21 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream LOG_WARNING(log, "remote reader meets error: {}", result.resp->error().DebugString()); throw Exception(result.resp->error().DebugString()); } - /// only the last response contains execution summaries - if (result.resp != nullptr) - { - if constexpr (is_streaming_reader) - { - addRemoteExecutionSummaries(*result.resp, result.call_index, true); - } - else - { - addRemoteExecutionSummaries(*result.resp, 0, false); - } - } - - const auto & decode_detail = result.decode_detail; - total_rows += decode_detail.rows; size_t index = 0; if constexpr (is_streaming_reader) index = result.call_index; - connection_profile_infos[index].packets += decode_detail.packets; - connection_profile_infos[index].bytes += decode_detail.packet_bytes; + /// only the last response contains execution summaries + if (result.resp != nullptr) + addRemoteExecutionSummaries(*result.resp, index); + + const auto & decode_detail = result.decode_detail; + auto & connection_profile_info = connection_profile_infos[index]; + connection_profile_info.packets += decode_detail.packets; + connection_profile_info.bytes += decode_detail.packet_bytes; + total_rows += decode_detail.rows; LOG_TRACE( log, "recv {} rows from remote for {}, total recv row num: {}", @@ -198,7 +150,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream execution_summaries.resize(source_num); connection_profile_infos.resize(source_num); sample_block = Block(getColumnWithTypeAndName(toNamesAndTypes(remote_reader->getOutputSchema()))); - constexpr size_t squash_rows_limit = 8192; + static constexpr size_t squash_rows_limit = 8192; if constexpr (is_streaming_reader) decoder_ptr = std::make_unique(sample_block, squash_rows_limit); } diff --git a/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp index 61bfb95c7c8..ea3424aaa8f 100644 --- a/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp +++ b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp @@ -18,26 +18,15 @@ namespace DB { - -/// delta_mode means when for a streaming call, return the delta execution summary -/// because TiDB is not aware of the streaming call when it handle the execution summaries -/// so we need to "pretend to be a unary call", can be removed if TiDB support streaming -/// call's execution summaries directly void ExecutionSummaryCollector::fillTiExecutionSummary( tipb::ExecutorExecutionSummary * execution_summary, ExecutionSummary & current, - const String & executor_id, - bool delta_mode) + const String & executor_id) { - auto & prev_stats = previous_execution_stats[executor_id]; - - execution_summary->set_time_processed_ns( - delta_mode ? current.time_processed_ns - prev_stats.time_processed_ns : current.time_processed_ns); - execution_summary->set_num_produced_rows( - delta_mode ? current.num_produced_rows - prev_stats.num_produced_rows : current.num_produced_rows); - execution_summary->set_num_iterations(delta_mode ? current.num_iterations - prev_stats.num_iterations : current.num_iterations); - execution_summary->set_concurrency(delta_mode ? current.concurrency - prev_stats.concurrency : current.concurrency); - prev_stats = current; + execution_summary->set_time_processed_ns(current.time_processed_ns); + execution_summary->set_num_produced_rows(current.num_produced_rows); + execution_summary->set_num_iterations(current.num_iterations); + execution_summary->set_concurrency(current.concurrency); if (dag_context.return_executor_id) execution_summary->set_executor_id(executor_id); } @@ -65,7 +54,7 @@ void mergeRemoteExecuteSummaries( } } -void ExecutionSummaryCollector::addExecuteSummaries(tipb::SelectResponse & response, bool delta_mode) +void ExecutionSummaryCollector::addExecuteSummaries(tipb::SelectResponse & response) { if (!dag_context.collect_execution_summaries) return; @@ -133,7 +122,7 @@ void ExecutionSummaryCollector::addExecuteSummaries(tipb::SelectResponse & respo } current.time_processed_ns += dag_context.compile_time_ns; - fillTiExecutionSummary(response.add_execution_summaries(), current, executor_id, delta_mode); + fillTiExecutionSummary(response.add_execution_summaries(), current, executor_id); }; /// add execution_summary for local executor @@ -161,7 +150,7 @@ void ExecutionSummaryCollector::addExecuteSummaries(tipb::SelectResponse & respo ExecutionSummary merged; for (auto & remote : p.second) merged.merge(remote, false); - fillTiExecutionSummary(response.add_execution_summaries(), merged, p.first, delta_mode); + fillTiExecutionSummary(response.add_execution_summaries(), merged, p.first); } } } diff --git a/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h index c537f3979e0..fc7b0199e04 100644 --- a/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h +++ b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h @@ -32,18 +32,16 @@ class ExecutionSummaryCollector } } - void addExecuteSummaries(tipb::SelectResponse & response, bool delta_mode); + void addExecuteSummaries(tipb::SelectResponse & response); private: void fillTiExecutionSummary( tipb::ExecutorExecutionSummary * execution_summary, ExecutionSummary & current, - const String & executor_id, - bool delta_mode); + const String & executor_id); private: DAGContext & dag_context; - std::unordered_map previous_execution_stats; std::unordered_set local_executors; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp index a77d086744a..5b8744fbc90 100644 --- a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp @@ -105,7 +105,7 @@ void StreamingDAGResponseWriter::encodeThenWriteBlocks() { TrackedSelectResp response; if constexpr (send_exec_summary_at_last) - summary_collector.addExecuteSummaries(response.getResponse(), /*delta_mode=*/true); + summary_collector.addExecuteSummaries(response.getResponse()); response.setEncodeType(dag_context.encode_type); if (blocks.empty()) { diff --git a/dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp index 04d6dc2fc4e..b603d74f1d6 100644 --- a/dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp @@ -77,7 +77,7 @@ void UnaryDAGResponseWriter::finishWrite() encodeChunkToDAGResponse(); } appendWarningsToDAGResponse(); - summary_collector.addExecuteSummaries(*dag_response, false); + summary_collector.addExecuteSummaries(*dag_response); } void UnaryDAGResponseWriter::write(const Block & block) diff --git a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp index 99e2d37bd91..a67029b3572 100644 --- a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp +++ b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp @@ -80,7 +80,7 @@ void BroadcastOrPassThroughWriter::encodeThenWriteBlocks() if constexpr (send_exec_summary_at_last) { TrackedSelectResp response; - summary_collector.addExecuteSummaries(response.getResponse(), /*delta_mode=*/false); + summary_collector.addExecuteSummaries(response.getResponse()); tracked_packet.serializeByResponse(response.getResponse()); } if (blocks.empty()) diff --git a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp index ceac91b2d71..ea2670de9d5 100644 --- a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp +++ b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp @@ -182,7 +182,7 @@ void FineGrainedShuffleWriter::writePackets(std::vector::writePackets(std::vector Date: Mon, 7 Nov 2022 18:50:14 +0800 Subject: [PATCH 2/4] update --- dbms/src/Flash/Coprocessor/DAGDriver.cpp | 2 +- dbms/src/Flash/Mpp/newMPPExchangeWriter.h | 1 + dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.cpp b/dbms/src/Flash/Coprocessor/DAGDriver.cpp index b1a9bc55e42..0afdb8a7673 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.cpp +++ b/dbms/src/Flash/Coprocessor/DAGDriver.cpp @@ -132,7 +132,7 @@ try streaming_writer, context.getSettingsRef().dag_records_per_chunk, context.getSettingsRef().batch_send_min_limit, - true, + dag_context.collect_execution_summaries, dag_context); dag_output_stream = std::make_shared(streams.in->getHeader(), std::move(response_writer)); copyData(*streams.in, *dag_output_stream); diff --git a/dbms/src/Flash/Mpp/newMPPExchangeWriter.h b/dbms/src/Flash/Mpp/newMPPExchangeWriter.h index 541ba94498c..5b7a5fd3b69 100644 --- a/dbms/src/Flash/Mpp/newMPPExchangeWriter.h +++ b/dbms/src/Flash/Mpp/newMPPExchangeWriter.h @@ -36,6 +36,7 @@ std::unique_ptr newMPPExchangeWriter( UInt64 fine_grained_shuffle_batch_size) { RUNTIME_CHECK(dag_context.isMPPTask()); + should_send_exec_summary_at_last = dag_context.collect_execution_summaries && should_send_exec_summary_at_last; if (dag_context.isRootMPPTask()) { RUNTIME_CHECK(!enable_fine_grained_shuffle); diff --git a/dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp b/dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp index 4b9faee1707..fcb1be3cea8 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp @@ -76,7 +76,7 @@ void PhysicalExchangeSender::transformImpl(DAGPipeline & pipeline, Context & con exchange_type, context.getSettingsRef().dag_records_per_chunk, context.getSettingsRef().batch_send_min_limit, - stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response + /*should_send_exec_summary_at_last=*/stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response dag_context, fine_grained_shuffle.enable(), fine_grained_shuffle.stream_count, From 49893216b15175dcb589a42a226ec76266b1ec5b Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 7 Nov 2022 19:48:16 +0800 Subject: [PATCH 3/4] fix clang-tidy --- dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp | 2 +- dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp index ea3424aaa8f..0cd64571136 100644 --- a/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp +++ b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp @@ -21,7 +21,7 @@ namespace DB void ExecutionSummaryCollector::fillTiExecutionSummary( tipb::ExecutorExecutionSummary * execution_summary, ExecutionSummary & current, - const String & executor_id) + const String & executor_id) const { execution_summary->set_time_processed_ns(current.time_processed_ns); execution_summary->set_num_produced_rows(current.num_produced_rows); diff --git a/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h index fc7b0199e04..f40eca91840 100644 --- a/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h +++ b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h @@ -38,7 +38,7 @@ class ExecutionSummaryCollector void fillTiExecutionSummary( tipb::ExecutorExecutionSummary * execution_summary, ExecutionSummary & current, - const String & executor_id); + const String & executor_id) const; private: DAGContext & dag_context; From f0b440e53d7b72299f5b7deb8f3d1afce83df92e Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 8 Nov 2022 11:11:48 +0800 Subject: [PATCH 4/4] fix --- .../DataStreams/TiRemoteBlockInputStream.h | 43 ++++++++++++++++--- 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/dbms/src/DataStreams/TiRemoteBlockInputStream.h b/dbms/src/DataStreams/TiRemoteBlockInputStream.h index b1531ac9657..0f76b60df6b 100644 --- a/dbms/src/DataStreams/TiRemoteBlockInputStream.h +++ b/dbms/src/DataStreams/TiRemoteBlockInputStream.h @@ -67,13 +67,8 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream std::unique_ptr decoder_ptr; - void addRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index) + void initRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index) { - if (unlikely(resp.execution_summaries_size() == 0)) - return; - - // There will only be one summary packet for one index. - RUNTIME_CHECK(!execution_summaries_inited[index].load()); for (const auto & execution_summary : resp.execution_summaries()) { if (likely(execution_summary.has_executor_id())) @@ -88,6 +83,42 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream execution_summaries_inited[index].store(true); } + void addRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index) + { + if (unlikely(resp.execution_summaries_size() == 0)) + return; + + if (!execution_summaries_inited[index].load()) + { + initRemoteExecutionSummaries(resp, index); + return; + } + if constexpr (is_streaming_reader) + throw Exception( + fmt::format( + "There are more than one execution summary packet of index {} in streaming reader, " + "this should not happen", + index)); + auto & execution_summaries_map = execution_summaries[index]; + for (const auto & execution_summary : resp.execution_summaries()) + { + if (likely(execution_summary.has_executor_id())) + { + const auto & executor_id = execution_summary.executor_id(); + if (unlikely(execution_summaries_map.find(executor_id) == execution_summaries_map.end())) + { + LOG_WARNING(log, "execution {} not found in execution_summaries, this should not happen", executor_id); + continue; + } + auto & remote_execution_summary = execution_summaries_map[executor_id]; + remote_execution_summary.time_processed_ns = std::max(remote_execution_summary.time_processed_ns, execution_summary.time_processed_ns()); + remote_execution_summary.num_produced_rows += execution_summary.num_produced_rows(); + remote_execution_summary.num_iterations += execution_summary.num_iterations(); + remote_execution_summary.concurrency += execution_summary.concurrency(); + } + } + } + bool fetchRemoteResult() { while (true)