Skip to content

Commit

Permalink
[Improvement](profile) Provide more info for schedule time (apache#38290
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Gabriel39 authored Jul 24, 2024
1 parent 8e17d59 commit 21ddfe2
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 9 deletions.
21 changes: 21 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ void PInternalService::exec_plan_fragment(google::protobuf::RpcController* contr
const PExecPlanFragmentRequest* request,
PExecPlanFragmentResult* response,
google::protobuf::Closure* done) {
timeval tv {};
gettimeofday(&tv, nullptr);
response->set_received_time(tv.tv_sec * 1000LL + tv.tv_usec / 1000);
bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
_exec_plan_fragment_in_pthread(controller, request, response, done);
});
Expand All @@ -301,6 +304,9 @@ void PInternalService::_exec_plan_fragment_in_pthread(google::protobuf::RpcContr
const PExecPlanFragmentRequest* request,
PExecPlanFragmentResult* response,
google::protobuf::Closure* done) {
timeval tv1 {};
gettimeofday(&tv1, nullptr);
response->set_execution_time(tv1.tv_sec * 1000LL + tv1.tv_usec / 1000);
brpc::ClosureGuard closure_guard(done);
auto st = Status::OK();
bool compact = request->has_compact() ? request->compact() : false;
Expand All @@ -318,12 +324,18 @@ void PInternalService::_exec_plan_fragment_in_pthread(google::protobuf::RpcContr
LOG(WARNING) << "exec plan fragment failed, errmsg=" << st;
}
st.to_protobuf(response->mutable_status());
timeval tv2 {};
gettimeofday(&tv2, nullptr);
response->set_execution_done_time(tv2.tv_sec * 1000LL + tv2.tv_usec / 1000);
}

void PInternalService::exec_plan_fragment_prepare(google::protobuf::RpcController* controller,
const PExecPlanFragmentRequest* request,
PExecPlanFragmentResult* response,
google::protobuf::Closure* done) {
timeval tv {};
gettimeofday(&tv, nullptr);
response->set_received_time(tv.tv_sec * 1000LL + tv.tv_usec / 1000);
bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
_exec_plan_fragment_in_pthread(controller, request, response, done);
});
Expand All @@ -337,10 +349,19 @@ void PInternalService::exec_plan_fragment_start(google::protobuf::RpcController*
const PExecPlanFragmentStartRequest* request,
PExecPlanFragmentResult* result,
google::protobuf::Closure* done) {
timeval tv {};
gettimeofday(&tv, nullptr);
result->set_received_time(tv.tv_sec * 1000LL + tv.tv_usec / 1000);
bool ret = _light_work_pool.try_offer([this, request, result, done]() {
timeval tv1 {};
gettimeofday(&tv1, nullptr);
result->set_execution_time(tv1.tv_sec * 1000LL + tv1.tv_usec / 1000);
brpc::ClosureGuard closure_guard(done);
auto st = _exec_env->fragment_mgr()->start_query_execution(request);
st.to_protobuf(result->mutable_status());
timeval tv2 {};
gettimeofday(&tv2, nullptr);
result->set_execution_done_time(tv2.tv_sec * 1000LL + tv2.tv_usec / 1000);
});
if (!ret) {
offer_failed(result, done, _light_work_pool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUnit;
import org.apache.doris.transaction.TransactionType;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.gson.Gson;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -50,6 +55,7 @@ public class SummaryProfile {
public static final String IS_NEREIDS = "Is Nereids";
public static final String TOTAL_INSTANCES_NUM = "Total Instances Num";
public static final String INSTANCES_NUM_PER_BE = "Instances Num Per BE";
public static final String SCHEDULE_TIME_PER_BE = "Schedule Time Of BE";
public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE = "Parallel Fragment Exec Instance Num";
public static final String TRACE_ID = "Trace ID";
public static final String WORKLOAD_GROUP = "Workload Group";
Expand Down Expand Up @@ -102,6 +108,10 @@ public class SummaryProfile {
public static final String HMS_ADD_PARTITION_CNT = "HMS Add Partition Count";
public static final String HMS_UPDATE_PARTITION_TIME = "HMS Update Partition Time";
public static final String HMS_UPDATE_PARTITION_CNT = "HMS Update Partition Count";
public static final String LATENCY_FROM_FE_TO_BE = "RPC Latency From FE To BE";
public static final String RPC_QUEUE_TIME = "RPC Work Queue Time";
public static final String RPC_WORK_TIME = "RPC Work Time";
public static final String LATENCY_FROM_BE_TO_FE = "RPC Latency From BE To FE";

// These info will display on FE's web ui table, every one will be displayed as
// a column, so that should not
Expand Down Expand Up @@ -145,6 +155,7 @@ public class SummaryProfile {
SEND_FRAGMENT_PHASE2_TIME,
FRAGMENT_COMPRESSED_SIZE,
FRAGMENT_RPC_COUNT,
SCHEDULE_TIME_PER_BE,
WAIT_FETCH_RESULT_TIME,
FETCH_RESULT_TIME,
WRITE_RESULT_TIME,
Expand Down Expand Up @@ -256,6 +267,11 @@ public class SummaryProfile {
private long filesystemDeleteDirCnt = 0;
private TransactionType transactionType = TransactionType.UNKNOWN;

// BE -> (RPC latency from FE to BE, Execution latency on bthread, Duration of doing work, RPC latency from BE
// to FE)
private Map<TNetworkAddress, List<Long>> rpcPhase1Latency;
private Map<TNetworkAddress, List<Long>> rpcPhase2Latency;

public SummaryProfile() {
summaryProfile = new RuntimeProfile(SUMMARY_PROFILE_NAME);
executionSummaryProfile = new RuntimeProfile(EXECUTION_SUMMARY_PROFILE_NAME);
Expand Down Expand Up @@ -344,6 +360,7 @@ private void updateExecutionSummaryProfile() {
getPrettyTime(createScanRangeFinishTime, getSplitsFinishTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(SCHEDULE_TIME,
getPrettyTime(queryScheduleFinishTime, queryPlanFinishTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(SCHEDULE_TIME_PER_BE, getRpcLatency());
executionSummaryProfile.addInfoString(ASSIGN_FRAGMENT_TIME,
getPrettyTime(assignFragmentTime, queryPlanFinishTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(FRAGMENT_SERIALIZE_TIME,
Expand Down Expand Up @@ -550,6 +567,14 @@ public long getQueryBeginTime() {
return queryBeginTime;
}

public void setRpcPhase1Latency(Map<TNetworkAddress, List<Long>> rpcPhase1Latency) {
this.rpcPhase1Latency = rpcPhase1Latency;
}

public void setRpcPhase2Latency(Map<TNetworkAddress, List<Long>> rpcPhase2Latency) {
this.rpcPhase2Latency = rpcPhase2Latency;
}

public static class SummaryBuilder {
private Map<String, String> map = Maps.newHashMap();

Expand Down Expand Up @@ -759,4 +784,43 @@ public void incDeleteDirRecursiveCnt() {
public void incDeleteFileCnt() {
this.filesystemDeleteFileCnt += 1;
}

private String getRpcLatency() {
Map<String, Map<String, Map<String, String>>> jsonObject = new HashMap<>();
if (rpcPhase1Latency != null) {
Map<String, Map<String, String>> latencyForPhase1 = new HashMap<>();
for (TNetworkAddress key : rpcPhase1Latency.keySet()) {
Preconditions.checkState(rpcPhase1Latency.get(key).size() == 4, "rpc latency should have 4 elements");
Map<String, String> latency = new HashMap<>();
latency.put(LATENCY_FROM_FE_TO_BE, RuntimeProfile.printCounter(rpcPhase1Latency.get(key).get(0),
TUnit.TIME_MS));
latency.put(RPC_QUEUE_TIME, RuntimeProfile.printCounter(rpcPhase1Latency.get(key).get(1),
TUnit.TIME_MS));
latency.put(RPC_WORK_TIME, RuntimeProfile.printCounter(rpcPhase1Latency.get(key).get(2),
TUnit.TIME_MS));
latency.put(LATENCY_FROM_BE_TO_FE, RuntimeProfile.printCounter(rpcPhase1Latency.get(key).get(3),
TUnit.TIME_MS));
latencyForPhase1.put(key.getHostname() + ": " + key.getPort(), latency);
}
jsonObject.put("phase1", latencyForPhase1);
}
if (rpcPhase2Latency != null) {
Map<String, Map<String, String>> latencyForPhase2 = new HashMap<>();
for (TNetworkAddress key : rpcPhase2Latency.keySet()) {
Preconditions.checkState(rpcPhase2Latency.get(key).size() == 4, "rpc latency should have 4 elements");
Map<String, String> latency = new HashMap<>();
latency.put(LATENCY_FROM_FE_TO_BE, RuntimeProfile.printCounter(rpcPhase2Latency.get(key).get(0),
TUnit.TIME_MS));
latency.put(RPC_QUEUE_TIME, RuntimeProfile.printCounter(rpcPhase2Latency.get(key).get(1),
TUnit.TIME_MS));
latency.put(RPC_WORK_TIME, RuntimeProfile.printCounter(rpcPhase2Latency.get(key).get(2),
TUnit.TIME_MS));
latency.put(LATENCY_FROM_BE_TO_FE, RuntimeProfile.printCounter(rpcPhase2Latency.get(key).get(3),
TUnit.TIME_MS));
latencyForPhase2.put(key.getHostname() + ": " + key.getPort(), latency);
}
jsonObject.put("phase2", latencyForPhase2);
}
return new Gson().toJson(jsonObject);
}
}
39 changes: 30 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TCompactProtocol;
import org.jetbrains.annotations.NotNull;
import org.joda.time.DateTime;

import java.security.SecureRandom;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -893,37 +894,44 @@ private void sendPipelineCtx() throws TException, RpcException, UserException {
updateProfileIfPresent(profile -> profile.setFragmentSerializeTime());

// 4.2 send fragments rpc
List<Triple<PipelineExecContexts, BackendServiceProxy, Future<InternalService.PExecPlanFragmentResult>>>
futures = Lists.newArrayList();
List<Pair<Long, Triple<PipelineExecContexts, BackendServiceProxy,
Future<InternalService.PExecPlanFragmentResult>>>> futures = Lists.newArrayList();
BackendServiceProxy proxy = BackendServiceProxy.getInstance();
for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
if (LOG.isDebugEnabled()) {
LOG.debug(ctxs.debugInfo());
}
futures.add(ImmutableTriple.of(ctxs, proxy, ctxs.execRemoteFragmentsAsync(proxy)));
futures.add(Pair.of(DateTime.now().getMillis(),
ImmutableTriple.of(ctxs, proxy, ctxs.execRemoteFragmentsAsync(proxy))));
}
waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments");
Map<TNetworkAddress, List<Long>> rpcPhase1Latency =
waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments");

updateProfileIfPresent(profile -> profile.updateFragmentRpcCount(futures.size()));
updateProfileIfPresent(profile -> profile.setFragmentSendPhase1Time());
updateProfileIfPresent(profile -> profile.setRpcPhase1Latency(rpcPhase1Latency));

if (twoPhaseExecution) {
// 5. send and wait execution start rpc
futures.clear();
for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
futures.add(ImmutableTriple.of(ctxs, proxy, ctxs.execPlanFragmentStartAsync(proxy)));
futures.add(Pair.of(DateTime.now().getMillis(),
ImmutableTriple.of(ctxs, proxy, ctxs.execPlanFragmentStartAsync(proxy))));
}
waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send execution start");
Map<TNetworkAddress, List<Long>> rpcPhase2Latency =
waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(),
"send execution start");
updateProfileIfPresent(profile -> profile.updateFragmentRpcCount(futures.size()));
updateProfileIfPresent(profile -> profile.setFragmentSendPhase2Time());
updateProfileIfPresent(profile -> profile.setRpcPhase2Latency(rpcPhase2Latency));
}
} finally {
unlock();
}
}

private void waitPipelineRpc(List<Triple<PipelineExecContexts, BackendServiceProxy,
Future<PExecPlanFragmentResult>>> futures, long leftTimeMs,
private Map<TNetworkAddress, List<Long>> waitPipelineRpc(List<Pair<Long, Triple<PipelineExecContexts,
BackendServiceProxy, Future<InternalService.PExecPlanFragmentResult>>>> futures, long leftTimeMs,
String operation) throws RpcException, UserException {
if (leftTimeMs <= 0) {
long currentTimeMillis = System.currentTimeMillis();
Expand All @@ -944,14 +952,26 @@ private void waitPipelineRpc(List<Triple<PipelineExecContexts, BackendServicePro
throw new UserException(msg);
}

// BE -> (RPC latency from FE to BE, Execution latency on bthread, Duration of doing work, RPC latency from BE
// to FE)
Map<TNetworkAddress, List<Long>> beToPrepareLatency = new HashMap<>();
long timeoutMs = Math.min(leftTimeMs, Config.remote_fragment_exec_timeout_ms);
for (Triple<PipelineExecContexts, BackendServiceProxy, Future<PExecPlanFragmentResult>> triple : futures) {
for (Pair<Long, Triple<PipelineExecContexts, BackendServiceProxy,
Future<InternalService.PExecPlanFragmentResult>>> pair : futures) {
Triple<PipelineExecContexts, BackendServiceProxy,
Future<InternalService.PExecPlanFragmentResult>> triple = pair.second;
TStatusCode code;
String errMsg = null;
Exception exception = null;

try {
PExecPlanFragmentResult result = triple.getRight().get(timeoutMs, TimeUnit.MILLISECONDS);
long rpcDone = DateTime.now().getMillis();
beToPrepareLatency.put(triple.getLeft().brpcAddr,
Lists.newArrayList(result.getReceivedTime() - pair.first,
result.getExecutionTime() - result.getReceivedTime(),
result.getExecutionDoneTime() - result.getExecutionTime(),
rpcDone - result.getExecutionDoneTime()));
code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code == null) {
code = TStatusCode.INTERNAL_ERROR;
Expand Down Expand Up @@ -1003,6 +1023,7 @@ private void waitPipelineRpc(List<Triple<PipelineExecContexts, BackendServicePro
}
}
}
return beToPrepareLatency;
}

public List<String> getExportFiles() {
Expand Down
6 changes: 6 additions & 0 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ message PExecPlanFragmentStartRequest {

message PExecPlanFragmentResult {
required PStatus status = 1;
// BE receive rpc
optional int64 received_time = 2;
// Start executing on bthread
optional int64 execution_time = 3;
// Done on bthread
optional int64 execution_done_time = 4;
};

message PCancelPlanFragmentRequest {
Expand Down

0 comments on commit 21ddfe2

Please sign in to comment.