Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.*: remove delta mode of execution summary #6271

Merged
merged 7 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 30 additions & 47 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,59 +71,50 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
{
for (const auto & execution_summary : resp.execution_summaries())
{
if (execution_summary.has_executor_id())
if (likely(execution_summary.has_executor_id()))
{
const auto & executor_id = execution_summary.executor_id();
execution_summaries[index][executor_id].time_processed_ns = execution_summary.time_processed_ns();
execution_summaries[index][executor_id].num_produced_rows = execution_summary.num_produced_rows();
execution_summaries[index][executor_id].num_iterations = execution_summary.num_iterations();
execution_summaries[index][executor_id].concurrency = execution_summary.concurrency();
auto & remote_execution_summary = execution_summaries[index][execution_summary.executor_id()];
remote_execution_summary.time_processed_ns = execution_summary.time_processed_ns();
remote_execution_summary.num_produced_rows = execution_summary.num_produced_rows();
remote_execution_summary.num_iterations = execution_summary.num_iterations();
remote_execution_summary.concurrency = execution_summary.concurrency();
}
}
execution_summaries_inited[index].store(true);
}

void addRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index, bool is_streaming_call)
void addRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index)
{
if (resp.execution_summaries_size() == 0)
if (unlikely(resp.execution_summaries_size() == 0))
return;

if (!execution_summaries_inited[index].load())
{
initRemoteExecutionSummaries(resp, index);
return;
}
if constexpr (is_streaming_reader)
throw Exception(
fmt::format(
"There are more than one execution summary packet of index {} in streaming reader, "
"this should not happen",
index));
auto & execution_summaries_map = execution_summaries[index];
for (const auto & execution_summary : resp.execution_summaries())
{
if (execution_summary.has_executor_id())
if (likely(execution_summary.has_executor_id()))
{
const auto & executor_id = execution_summary.executor_id();
if (unlikely(execution_summaries_map.find(executor_id) == execution_summaries_map.end()))
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
{
LOG_WARNING(log, "execution {} not found in execution_summaries, this should not happen", executor_id);
continue;
}
auto & current_execution_summary = execution_summaries_map[executor_id];
if (is_streaming_call)
{
current_execution_summary.time_processed_ns
= std::max(current_execution_summary.time_processed_ns, execution_summary.time_processed_ns());
current_execution_summary.num_produced_rows
= std::max(current_execution_summary.num_produced_rows, execution_summary.num_produced_rows());
current_execution_summary.num_iterations
= std::max(current_execution_summary.num_iterations, execution_summary.num_iterations());
current_execution_summary.concurrency
= std::max(current_execution_summary.concurrency, execution_summary.concurrency());
}
else
{
current_execution_summary.time_processed_ns
= std::max(current_execution_summary.time_processed_ns, execution_summary.time_processed_ns());
current_execution_summary.num_produced_rows += execution_summary.num_produced_rows();
current_execution_summary.num_iterations += execution_summary.num_iterations();
current_execution_summary.concurrency += execution_summary.concurrency();
}
auto & remote_execution_summary = execution_summaries_map[executor_id];
remote_execution_summary.time_processed_ns = std::max(remote_execution_summary.time_processed_ns, execution_summary.time_processed_ns());
remote_execution_summary.num_produced_rows += execution_summary.num_produced_rows();
remote_execution_summary.num_iterations += execution_summary.num_iterations();
remote_execution_summary.concurrency += execution_summary.concurrency();
}
}
}
Expand All @@ -145,29 +136,21 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
LOG_WARNING(log, "remote reader meets error: {}", result.resp->error().DebugString());
throw Exception(result.resp->error().DebugString());
}
/// only the last response contains execution summaries
if (result.resp != nullptr)
{
if constexpr (is_streaming_reader)
{
addRemoteExecutionSummaries(*result.resp, result.call_index, true);
}
else
{
addRemoteExecutionSummaries(*result.resp, 0, false);
}
}

const auto & decode_detail = result.decode_detail;
total_rows += decode_detail.rows;

size_t index = 0;
if constexpr (is_streaming_reader)
index = result.call_index;

connection_profile_infos[index].packets += decode_detail.packets;
connection_profile_infos[index].bytes += decode_detail.packet_bytes;
/// only the last response contains execution summaries
if (result.resp != nullptr)
addRemoteExecutionSummaries(*result.resp, index);

const auto & decode_detail = result.decode_detail;
auto & connection_profile_info = connection_profile_infos[index];
connection_profile_info.packets += decode_detail.packets;
connection_profile_info.bytes += decode_detail.packet_bytes;

total_rows += decode_detail.rows;
LOG_TRACE(
log,
"recv {} rows from remote for {}, total recv row num: {}",
Expand Down Expand Up @@ -198,7 +181,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
execution_summaries.resize(source_num);
connection_profile_infos.resize(source_num);
sample_block = Block(getColumnWithTypeAndName(toNamesAndTypes(remote_reader->getOutputSchema())));
constexpr size_t squash_rows_limit = 8192;
static constexpr size_t squash_rows_limit = 8192;
if constexpr (is_streaming_reader)
decoder_ptr = std::make_unique<CHBlockChunkDecodeAndSquash>(sample_block, squash_rows_limit);
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ try
streaming_writer,
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
true,
dag_context.collect_execution_summaries,
dag_context);
dag_output_stream = std::make_shared<DAGBlockOutputStream>(streams.in->getHeader(), std::move(response_writer));
copyData(*streams.in, *dag_output_stream);
Expand Down
27 changes: 8 additions & 19 deletions dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,15 @@

namespace DB
{

/// delta_mode means when for a streaming call, return the delta execution summary
/// because TiDB is not aware of the streaming call when it handle the execution summaries
/// so we need to "pretend to be a unary call", can be removed if TiDB support streaming
/// call's execution summaries directly
void ExecutionSummaryCollector::fillTiExecutionSummary(
tipb::ExecutorExecutionSummary * execution_summary,
ExecutionSummary & current,
const String & executor_id,
bool delta_mode)
const String & executor_id) const
{
auto & prev_stats = previous_execution_stats[executor_id];

execution_summary->set_time_processed_ns(
delta_mode ? current.time_processed_ns - prev_stats.time_processed_ns : current.time_processed_ns);
execution_summary->set_num_produced_rows(
delta_mode ? current.num_produced_rows - prev_stats.num_produced_rows : current.num_produced_rows);
execution_summary->set_num_iterations(delta_mode ? current.num_iterations - prev_stats.num_iterations : current.num_iterations);
execution_summary->set_concurrency(delta_mode ? current.concurrency - prev_stats.concurrency : current.concurrency);
prev_stats = current;
execution_summary->set_time_processed_ns(current.time_processed_ns);
execution_summary->set_num_produced_rows(current.num_produced_rows);
execution_summary->set_num_iterations(current.num_iterations);
execution_summary->set_concurrency(current.concurrency);
if (dag_context.return_executor_id)
execution_summary->set_executor_id(executor_id);
}
Expand Down Expand Up @@ -65,7 +54,7 @@ void mergeRemoteExecuteSummaries(
}
}

void ExecutionSummaryCollector::addExecuteSummaries(tipb::SelectResponse & response, bool delta_mode)
void ExecutionSummaryCollector::addExecuteSummaries(tipb::SelectResponse & response)
{
if (!dag_context.collect_execution_summaries)
return;
Expand Down Expand Up @@ -133,7 +122,7 @@ void ExecutionSummaryCollector::addExecuteSummaries(tipb::SelectResponse & respo
}

current.time_processed_ns += dag_context.compile_time_ns;
fillTiExecutionSummary(response.add_execution_summaries(), current, executor_id, delta_mode);
fillTiExecutionSummary(response.add_execution_summaries(), current, executor_id);
};

/// add execution_summary for local executor
Expand Down Expand Up @@ -161,7 +150,7 @@ void ExecutionSummaryCollector::addExecuteSummaries(tipb::SelectResponse & respo
ExecutionSummary merged;
for (auto & remote : p.second)
merged.merge(remote, false);
fillTiExecutionSummary(response.add_execution_summaries(), merged, p.first, delta_mode);
fillTiExecutionSummary(response.add_execution_summaries(), merged, p.first);
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,16 @@ class ExecutionSummaryCollector
}
}

void addExecuteSummaries(tipb::SelectResponse & response, bool delta_mode);
void addExecuteSummaries(tipb::SelectResponse & response);

private:
void fillTiExecutionSummary(
tipb::ExecutorExecutionSummary * execution_summary,
ExecutionSummary & current,
const String & executor_id,
bool delta_mode);
const String & executor_id) const;

private:
DAGContext & dag_context;
std::unordered_map<String, ExecutionSummary> previous_execution_stats;
std::unordered_set<String> local_executors;
};
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ template <class StreamWriterPtr>
void StreamingDAGResponseWriter<StreamWriterPtr>::sendExecutionSummary()
{
tipb::SelectResponse response;
summary_collector.addExecuteSummaries(response, /*delta_mode=*/true);
summary_collector.addExecuteSummaries(response);
writer->write(response);
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void UnaryDAGResponseWriter::finishWrite()
encodeChunkToDAGResponse();
}
appendWarningsToDAGResponse();
summary_collector.addExecuteSummaries(*dag_response, false);
summary_collector.addExecuteSummaries(*dag_response);
}

void UnaryDAGResponseWriter::write(const Block & block)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ template <class ExchangeWriterPtr>
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::sendExecutionSummary()
{
tipb::SelectResponse response;
summary_collector.addExecuteSummaries(response, /*delta_mode=*/false);
summary_collector.addExecuteSummaries(response);
writer->sendExecutionSummary(response);
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ template <class ExchangeWriterPtr>
void FineGrainedShuffleWriter<ExchangeWriterPtr>::sendExecutionSummary()
{
tipb::SelectResponse response;
summary_collector.addExecuteSummaries(response, /*delta_mode=*/false);
summary_collector.addExecuteSummaries(response);
writer->sendExecutionSummary(response);
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/HashPartitionWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ template <class ExchangeWriterPtr>
void HashPartitionWriter<ExchangeWriterPtr>::sendExecutionSummary()
{
tipb::SelectResponse response;
summary_collector.addExecuteSummaries(response, /*delta_mode=*/false);
summary_collector.addExecuteSummaries(response);
writer->sendExecutionSummary(response);
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/newMPPExchangeWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ std::unique_ptr<DAGResponseWriter> newMPPExchangeWriter(
UInt64 fine_grained_shuffle_batch_size)
{
RUNTIME_CHECK(dag_context.isMPPTask());
should_send_exec_summary_at_last = dag_context.collect_execution_summaries && should_send_exec_summary_at_last;
if (dag_context.isRootMPPTask())
{
RUNTIME_CHECK(!enable_fine_grained_shuffle);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void PhysicalExchangeSender::transformImpl(DAGPipeline & pipeline, Context & con
exchange_type,
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response
/*should_send_exec_summary_at_last=*/stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response
dag_context,
fine_grained_shuffle.enable(),
fine_grained_shuffle.stream_count,
Expand Down