diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 9cf0fd6d74c729a..0edc04b6992ccd9 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -231,6 +231,8 @@ Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) { Status FragmentExecState::execute() { if (_need_wait_execution_trigger) { + // if _need_wait_execution_trigger is true, which means this instance + // is prepared but need to wait for the signal to do the rest execution. _fragments_ctx->wait_for_start(); } int64_t duration_ns = 0; @@ -640,6 +642,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi params.params.fragment_instance_id, params.backend_num, _exec_env, fragments_ctx)); if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) { + // set need_wait_execution_trigger means this instance will not actually being executed + // until the execPlanFragmentStart RPC trigger to start it. exec_state->set_need_wait_execution_trigger(); } @@ -703,6 +707,8 @@ void FragmentMgr::cancel_worker() { } for (auto it = _fragments_ctx_map.begin(); it != _fragments_ctx_map.end();) { if (it->second->is_timeout(now)) { + // The execution logic of the instance needs to be notified. + // The execution logic of the instance will eventually cancel the execution plan. it->second->set_ready_to_execute(); it = _fragments_ctx_map.erase(it); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index bd0a38fd0f727d2..636ad3abcd3db2b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -539,6 +539,31 @@ public void exec() throws Exception { sendFragment(); } + /** + * The logic for sending query plan fragments is as follows: + * First, plan fragments are dependent. According to the order in "fragments" list, + * it must be ensured that on the BE side, the next fragment instance can be executed + * only after the previous fragment instance is ready, + *

+ * In the previous logic, we will send fragment instances in sequence through RPC, + * and will wait for the RPC of the previous fragment instance to return successfully + * before sending the next one. But for some complex queries, this may lead to too many RPCs. + *

+ * The optimized logic is as follows: + * 1. If the number of fragment instance is <= 2, the original logic is still used + * to complete the sending of fragments through at most 2 RPCs. + * 2. If the number of fragment instance is >= 3, first group all fragments by BE, + * and send all fragment instances to the corresponding BE node through the FIRST rpc, + * but these fragment instances will only perform the preparation phase but will not be actually executed. + * After that, the execution logic of all fragment instances is started through the SECOND RPC. + *

+ * After optimization, a query on a BE node will only send two RPCs at most. + * Thereby reducing the "send fragment timeout" error caused by too many RPCs and BE unable to process in time. + * + * @throws TException + * @throws RpcException + * @throws UserException + */ private void sendFragment() throws TException, RpcException, UserException { lock(); try { @@ -585,8 +610,7 @@ private void sendFragment() throws TException, RpcException, UserException { int instanceId = 0; for (TExecPlanFragmentParams tParam : tParams) { BackendExecState execState = - new BackendExecState(fragment.getFragmentId(), instanceId++, profileFragmentId, tParam, - this.addressToBackendID); + new BackendExecState(fragment.getFragmentId(), instanceId++, profileFragmentId, tParam, this.addressToBackendID); // Each tParam will set the total number of Fragments that need to be executed on the same BE, // and the BE will determine whether all Fragments have been executed based on this information. tParam.setFragmentNumOnHost(hostCounter.count(execState.address)); @@ -613,8 +637,7 @@ private void sendFragment() throws TException, RpcException, UserException { } // end for fragments // 4. send and wait fragments rpc - List>> futures = - Lists.newArrayList(); + List>> futures = Lists.newArrayList(); for (BackendExecStates states : beToExecStates.values()) { states.unsetFields(); futures.add(Pair.create(states, states.execRemoteFragmentsAsync()));