Skip to content

Commit

Permalink
remove log
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed May 29, 2022
1 parent e6aae5a commit af6e8a4
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
4 changes: 0 additions & 4 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c
SCOPED_SWITCH_BTHREAD();
brpc::ClosureGuard closure_guard(done);
auto st = Status::OK();
LOG(INFO) << "cmy rpc exec_plan_fragment";
bool compact = request->has_compact() ? request->compact() : false;
PFragmentRequestVersion version = request->has_version() ? request->version() : PFragmentRequestVersion::VERSION_1;
st = _exec_plan_fragment(request->request(), version, compact);
Expand All @@ -125,7 +124,6 @@ void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcControl
PExecPlanFragmentResult* result,
google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
LOG(INFO) << "cmy rpc exec_plan_fragment_start";
brpc::ClosureGuard closure_guard(done);
auto st = _exec_env->fragment_mgr()->start_query_execution(request);
st.to_protobuf(result->mutable_status());
Expand Down Expand Up @@ -232,11 +230,9 @@ Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request,
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request));
}

LOG(INFO) << "cmy version 2 t_request.paramsList size: " << t_request.paramsList.size();
for (const TExecPlanFragmentParams& params : t_request.paramsList) {
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
}
sleep(1);
return Status::OK();
} else {
return Status::InternalError("invalid version");
Expand Down
16 changes: 10 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ private void sendFragment() throws TException, RpcException, UserException {
int profileFragmentId = 0;
long memoryLimit = queryOptions.getMemLimit();
Map<Long, BackendExecStates> beToExecStates = Maps.newHashMap();
boolean needTrigger = fragments.size() > 2;
for (PlanFragment fragment : fragments) {
FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId());

Expand Down Expand Up @@ -590,7 +591,7 @@ private void sendFragment() throws TException, RpcException, UserException {
// and the BE will determine whether all Fragments have been executed based on this information.
tParam.setFragmentNumOnHost(hostCounter.count(execState.address));
tParam.setBackendId(execState.backend.getId());
tParam.setNeedWaitExecutionTrigger(true);
tParam.setNeedWaitExecutionTrigger(needTrigger);

backendExecStates.add(execState);
if (needCheckBackendState) {
Expand Down Expand Up @@ -620,12 +621,14 @@ private void sendFragment() throws TException, RpcException, UserException {
}
waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments");

// 5. send and wait execution start rpc
futures.clear();
for (BackendExecStates states : beToExecStates.values()) {
futures.add(Pair.create(states, states.execPlanFragmentStartAsync()));
if (needTrigger) {
// 5. send and wait execution start rpc
futures.clear();
for (BackendExecStates states : beToExecStates.values()) {
futures.add(Pair.create(states, states.execPlanFragmentStartAsync()));
}
waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send execution start");
}
waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send execution start");

attachInstanceProfileToFragmentProfile();
} finally {
Expand Down Expand Up @@ -2353,3 +2356,4 @@ public FRuntimeFilterTargetParam(TUniqueId id, TNetworkAddress host) {
}
}
}

0 comments on commit af6e8a4

Please sign in to comment.