Skip to content

Commit

Permalink
cherry pick #1835 to release-5.0 (#1840)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>

Co-authored-by: xufei <[email protected]>
  • Loading branch information
ti-srebot and windtalker authored Apr 30, 2021
1 parent 9fa616c commit 294c37e
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 56 deletions.
17 changes: 13 additions & 4 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,23 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
auto & current_execution_summary = execution_summaries[executor_id][index];
if (is_streaming_call)
{
current_execution_summary.time_processed_ns += execution_summary.time_processed_ns();
current_execution_summary.time_processed_ns
= std::max(current_execution_summary.time_processed_ns, execution_summary.time_processed_ns());
current_execution_summary.num_produced_rows
= std::max(current_execution_summary.num_produced_rows, execution_summary.num_produced_rows());
current_execution_summary.num_iterations
= std::max(current_execution_summary.num_iterations, execution_summary.num_iterations());
current_execution_summary.concurrency
= std::max(current_execution_summary.concurrency, execution_summary.concurrency());
}
else
{
current_execution_summary.time_processed_ns
= std::max(current_execution_summary.time_processed_ns, execution_summary.time_processed_ns());
current_execution_summary.num_produced_rows += execution_summary.num_produced_rows();
current_execution_summary.num_iterations += execution_summary.num_iterations();
current_execution_summary.concurrency += execution_summary.concurrency();
}
current_execution_summary.num_produced_rows += execution_summary.num_produced_rows();
current_execution_summary.num_iterations += execution_summary.num_iterations();
current_execution_summary.concurrency += execution_summary.concurrency();
}
}
}
Expand Down Expand Up @@ -190,6 +197,8 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
{
return execution_summaries_inited.load() ? &execution_summaries : nullptr;
}

bool isStreamingCall() const { return is_streaming_reader; }
};

using ExchangeReceiverInputStream = TiRemoteBlockInputStream<ExchangeReceiver>;
Expand Down
93 changes: 50 additions & 43 deletions dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,57 @@
namespace DB
{

/// delta_mode means when for a streaming call, return the delta execution summary
/// because TiDB is not aware of the streaming call when it handle the execution summaries
/// so we need to "pretend to be a unary call", can be removed if TiDB support streaming
/// call's execution summaries directly
void DAGResponseWriter::fillTiExecutionSummary(
tipb::ExecutorExecutionSummary * execution_summary, ExecutionSummary & current, const String & executor_id)
tipb::ExecutorExecutionSummary * execution_summary, ExecutionSummary & current, const String & executor_id, bool delta_mode)
{
auto & prev_stats = previous_execution_stats[executor_id];

execution_summary->set_time_processed_ns(current.time_processed_ns - prev_stats.time_processed_ns);
execution_summary->set_num_produced_rows(current.num_produced_rows - prev_stats.num_produced_rows);
execution_summary->set_num_iterations(current.num_iterations - prev_stats.num_iterations);
execution_summary->set_concurrency(current.concurrency - prev_stats.concurrency);
execution_summary->set_time_processed_ns(
delta_mode ? current.time_processed_ns - prev_stats.time_processed_ns : current.time_processed_ns);
execution_summary->set_num_produced_rows(
delta_mode ? current.num_produced_rows - prev_stats.num_produced_rows : current.num_produced_rows);
execution_summary->set_num_iterations(delta_mode ? current.num_iterations - prev_stats.num_iterations : current.num_iterations);
execution_summary->set_concurrency(delta_mode ? current.concurrency - prev_stats.concurrency : current.concurrency);
prev_stats = current;
if (dag_context.return_executor_id)
execution_summary->set_executor_id(executor_id);
}

void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response)
void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response, bool delta_mode)
{
if (!dag_context.collect_execution_summaries)
return;
/// get executionSummary info from remote input streams
std::unordered_map<String, std::vector<ExecutionSummary>> merged_remote_execution_summaries;
for (auto & streamPtr : dag_context.getRemoteInputStreams())
{
auto coprocessor_input_stream = dynamic_cast<CoprocessorBlockInputStream *>(streamPtr.get());
auto exchange_receiver_input_stream = dynamic_cast<ExchangeReceiverInputStream *>(streamPtr.get());
auto remote_execution_summaries = coprocessor_input_stream != nullptr
? coprocessor_input_stream->getRemoteExecutionSummaries()
: exchange_receiver_input_stream->getRemoteExecutionSummaries();
if (remote_execution_summaries != nullptr)
{
bool is_streaming_call = coprocessor_input_stream != nullptr ? coprocessor_input_stream->isStreamingCall()
: exchange_receiver_input_stream->isStreamingCall();
for (auto & p : *remote_execution_summaries)
{
if (merged_remote_execution_summaries[p.first].size() < p.second.size())
{
merged_remote_execution_summaries[p.first].resize(p.second.size());
}
for (size_t i = 0; i < p.second.size(); i++)
{
merged_remote_execution_summaries[p.first][i].merge(p.second[i], is_streaming_call);
}
}
}
}

/// add execution_summary for local executor
for (auto & p : dag_context.getProfileStreamsMap())
{
Expand All @@ -39,22 +72,10 @@ void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response)
current.concurrency++;
}
/// part 2: remote execution info
for (auto & streamPtr : dag_context.getRemoteInputStreams())
if (merged_remote_execution_summaries.find(p.first) != merged_remote_execution_summaries.end())
{
auto remote_execution_summaries = dynamic_cast<CoprocessorBlockInputStream *>(streamPtr.get()) != nullptr
? dynamic_cast<CoprocessorBlockInputStream *>(streamPtr.get())->getRemoteExecutionSummaries()
: dynamic_cast<ExchangeReceiverInputStream *>(streamPtr.get())->getRemoteExecutionSummaries();
if (remote_execution_summaries != nullptr)
{
const auto & remote_execution_info = remote_execution_summaries->find(p.first);
if (remote_execution_info != remote_execution_summaries->end())
{
for (const auto & remote_execution_summary : remote_execution_info->second)
{
current.merge(remote_execution_summary);
}
}
}
for (auto & remote : merged_remote_execution_summaries[p.first])
current.merge(remote, false);
}
/// part 3: for join need to add the build time
for (auto & join_alias : dag_context.getQBIdToJoinAliasMap()[p.second.qb_id])
Expand All @@ -73,40 +94,26 @@ void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response)
}

current.time_processed_ns += dag_context.compile_time_ns;
fillTiExecutionSummary(response.add_execution_summaries(), current, p.first);
fillTiExecutionSummary(response.add_execution_summaries(), current, p.first, delta_mode);
/// do not have an easy and meaningful way to get the execution summary for exchange sender
/// executor, however, TiDB requires execution summary for all the executors, so just return
/// its child executor's execution summary
if (dag_context.isMPPTask() && p.first == dag_context.exchange_sender_execution_summary_key)
{
current.concurrency = dag_context.final_concurrency;
fillTiExecutionSummary(response.add_execution_summaries(), current, dag_context.exchange_sender_executor_id);
fillTiExecutionSummary(response.add_execution_summaries(), current, dag_context.exchange_sender_executor_id, delta_mode);
}
}
/// add executionSummary for remote executor
std::unordered_map<String, ExecutionSummary> merged_remote_execution_summaries;
for (auto & streamPtr : dag_context.getRemoteInputStreams())
for (auto & p : merged_remote_execution_summaries)
{
auto remote_execution_summaries = dynamic_cast<CoprocessorBlockInputStream *>(streamPtr.get()) != nullptr
? dynamic_cast<CoprocessorBlockInputStream *>(streamPtr.get())->getRemoteExecutionSummaries()
: dynamic_cast<ExchangeReceiverInputStream *>(streamPtr.get())->getRemoteExecutionSummaries();
if (remote_execution_summaries != nullptr)
if (local_executors.find(p.first) == local_executors.end())
{
for (auto & p : *remote_execution_summaries)
{
if (local_executors.find(p.first) == local_executors.end())
{
auto & current = merged_remote_execution_summaries[p.first];
for (const auto & remote_execution_summary : p.second)
{
current.merge(remote_execution_summary);
}
}
}
ExecutionSummary merged;
for (auto & remote : p.second)
merged.merge(remote, false);
fillTiExecutionSummary(response.add_execution_summaries(), merged, p.first, delta_mode);
}
}
for (auto & p : merged_remote_execution_summaries)
fillTiExecutionSummary(response.add_execution_summaries(), p.second, p.first);
}

DAGResponseWriter::DAGResponseWriter(
Expand Down
25 changes: 18 additions & 7 deletions dbms/src/Flash/Coprocessor/DAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,22 @@ struct ExecutionSummary
UInt64 concurrency;
ExecutionSummary() : time_processed_ns(0), num_produced_rows(0), num_iterations(0), concurrency(0) {}

void merge(const ExecutionSummary & other)
void merge(const ExecutionSummary & other, bool streaming_call)
{
time_processed_ns = std::max(time_processed_ns, other.time_processed_ns);
num_produced_rows += other.num_produced_rows;
num_iterations += other.num_iterations;
concurrency += other.concurrency;
if (streaming_call)
{
time_processed_ns = std::max(time_processed_ns, other.time_processed_ns);
num_produced_rows = std::max(num_produced_rows, other.num_produced_rows);
num_iterations = std::max(num_iterations, other.num_iterations);
concurrency = std::max(concurrency, other.concurrency);
}
else
{
time_processed_ns = std::max(time_processed_ns, other.time_processed_ns);
num_produced_rows += other.num_produced_rows;
num_iterations += other.num_iterations;
concurrency += other.concurrency;
}
}
};

Expand All @@ -36,8 +46,9 @@ class DAGResponseWriter
public:
DAGResponseWriter(Int64 records_per_chunk_, tipb::EncodeType encode_type_, std::vector<tipb::FieldType> result_field_types_,
DAGContext & dag_context_);
void fillTiExecutionSummary(tipb::ExecutorExecutionSummary * execution_summary, ExecutionSummary & current, const String & executor_id);
void addExecuteSummaries(tipb::SelectResponse & response);
void fillTiExecutionSummary(
tipb::ExecutorExecutionSummary * execution_summary, ExecutionSummary & current, const String & executor_id, bool delta_mode);
void addExecuteSummaries(tipb::SelectResponse & response, bool delta_mode);
virtual void write(const Block & block) = 0;
virtual void finishWrite() = 0;
virtual ~DAGResponseWriter() = default;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ template <class StreamWriterPtr>
void StreamingDAGResponseWriter<StreamWriterPtr>::ScheduleEncodeTask()
{
tipb::SelectResponse response;
addExecuteSummaries(response);
addExecuteSummaries(response, !dag_context.isMPPTask() || dag_context.isRootMPPTask());
if (exchange_type == tipb::ExchangeType::Hash)
{
thread_pool.schedule(getEncodePartitionTask(blocks, response));
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void UnaryDAGResponseWriter::finishWrite()
{
encodeChunkToDAGResponse();
}
addExecuteSummaries(*dag_response);
addExecuteSummaries(*dag_response, false);
}

void UnaryDAGResponseWriter::write(const Block & block)
Expand Down

0 comments on commit 294c37e

Please sign in to comment.