Skip to content

Commit

Permalink
add log
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed May 29, 2022
1 parent af6e8a4 commit fb9a06e
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
6 changes: 6 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) {

Status FragmentExecState::execute() {
if (_need_wait_execution_trigger) {
// if _need_wait_execution_trigger is true, which means this instance
// is prepared but need to wait for the signal to do the rest execution.
_fragments_ctx->wait_for_start();
}
int64_t duration_ns = 0;
Expand Down Expand Up @@ -640,6 +642,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
params.params.fragment_instance_id,
params.backend_num, _exec_env, fragments_ctx));
if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) {
// set need_wait_execution_trigger means this instance will not actually being executed
// until the execPlanFragmentStart RPC trigger to start it.
exec_state->set_need_wait_execution_trigger();
}

Expand Down Expand Up @@ -703,6 +707,8 @@ void FragmentMgr::cancel_worker() {
}
for (auto it = _fragments_ctx_map.begin(); it != _fragments_ctx_map.end();) {
if (it->second->is_timeout(now)) {
// The execution logic of the instance needs to be notified.
// The execution logic of the instance will eventually cancel the execution plan.
it->second->set_ready_to_execute();
it = _fragments_ctx_map.erase(it);
} else {
Expand Down
31 changes: 27 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,31 @@ public void exec() throws Exception {
sendFragment();
}

/**
* The logic for sending query plan fragments is as follows:
* First, plan fragments are dependent. According to the order in "fragments" list,
* it must be ensured that on the BE side, the next fragment instance can be executed
* only after the previous fragment instance is ready,
* <p>
* In the previous logic, we will send fragment instances in sequence through RPC,
* and will wait for the RPC of the previous fragment instance to return successfully
* before sending the next one. But for some complex queries, this may lead to too many RPCs.
* <p>
* The optimized logic is as follows:
* 1. If the number of fragment instance is <= 2, the original logic is still used
* to complete the sending of fragments through at most 2 RPCs.
* 2. If the number of fragment instance is >= 3, first group all fragments by BE,
* and send all fragment instances to the corresponding BE node through the FIRST rpc,
* but these fragment instances will only perform the preparation phase but will not be actually executed.
* After that, the execution logic of all fragment instances is started through the SECOND RPC.
* <p>
* After optimization, a query on a BE node will only send two RPCs at most.
* Thereby reducing the "send fragment timeout" error caused by too many RPCs and BE unable to process in time.
*
* @throws TException
* @throws RpcException
* @throws UserException
*/
private void sendFragment() throws TException, RpcException, UserException {
lock();
try {
Expand Down Expand Up @@ -585,8 +610,7 @@ private void sendFragment() throws TException, RpcException, UserException {
int instanceId = 0;
for (TExecPlanFragmentParams tParam : tParams) {
BackendExecState execState =
new BackendExecState(fragment.getFragmentId(), instanceId++, profileFragmentId, tParam,
this.addressToBackendID);
new BackendExecState(fragment.getFragmentId(), instanceId++, profileFragmentId, tParam, this.addressToBackendID);
// Each tParam will set the total number of Fragments that need to be executed on the same BE,
// and the BE will determine whether all Fragments have been executed based on this information.
tParam.setFragmentNumOnHost(hostCounter.count(execState.address));
Expand All @@ -613,8 +637,7 @@ private void sendFragment() throws TException, RpcException, UserException {
} // end for fragments

// 4. send and wait fragments rpc
List<Pair<BackendExecStates, Future<InternalService.PExecPlanFragmentResult>>> futures =
Lists.newArrayList();
List<Pair<BackendExecStates, Future<InternalService.PExecPlanFragmentResult>>> futures = Lists.newArrayList();
for (BackendExecStates states : beToExecStates.values()) {
states.unsetFields();
futures.add(Pair.create(states, states.execRemoteFragmentsAsync()));
Expand Down

0 comments on commit fb9a06e

Please sign in to comment.