From 407e224544ed08934784d4e50c331a92f9b71fd0 Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 20 May 2022 17:07:47 +0800 Subject: [PATCH] fix bug be part 3 4 fix bug --- be/src/service/internal_service.cpp | 36 ++++-- be/src/service/internal_service.h | 2 +- .../java/org/apache/doris/common/Config.java | 9 -- .../java/org/apache/doris/qe/Coordinator.java | 115 +++++++++++------- .../doris/qe/InsertStreamTxnExecutor.java | 7 +- .../apache/doris/rpc/BackendServiceProxy.java | 16 ++- .../load/sync/canal/CanalSyncDataTest.java | 16 +-- gensrc/proto/internal_service.proto | 6 + gensrc/thrift/PaloInternalService.thrift | 4 + 9 files changed, 132 insertions(+), 79 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index bfdd1f31ba3eb3f..35380e1fd9fe0be 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -111,7 +111,8 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c brpc::ClosureGuard closure_guard(done); auto st = Status::OK(); bool compact = request->has_compact() ? request->compact() : false; - st = _exec_plan_fragment(request->request(), compact); + PFragmentRequestVersion version = request->has_version() ? request->version() : PFragmentRequestVersion::VERSION_1; + st = _exec_plan_fragment(request->request(), version, compact); if (!st.ok()) { LOG(WARNING) << "exec plan fragment failed, errmsg=" << st.get_error_msg(); } @@ -201,14 +202,33 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* } } -Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, bool compact) { - TExecPlanFragmentParams t_request; - { - const uint8_t* buf = (const uint8_t*)ser_request.data(); - uint32_t len = ser_request.size(); - RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request)); +Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, PFragmentRequestVersion version, bool compact) { + if (version == PFragmentRequestVersion::VERSION_1) { + // VERSION_1 should be removed in v1.2 + TExecPlanFragmentParams t_request; + { + const uint8_t* buf = (const uint8_t*)ser_request.data(); + uint32_t len = ser_request.size(); + RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request)); + } + return _exec_env->fragment_mgr()->exec_plan_fragment(t_request); + } else if (version == PFragmentRequestVersion::VERSION_2) { + TExecPlanFragmentParamsList t_request; + { + const uint8_t* buf = (const uint8_t*)ser_request.data(); + uint32_t len = ser_request.size(); + RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request)); + } + + LOG(INFO) << "cmy version 2 t_request.paramsList size: " << t_request.paramsList.size(); + for (const TExecPlanFragmentParams& params : t_request.paramsList) { + RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params)); + } + sleep(1); + return Status::OK(); + } else { + return Status::InternalError("invalid version"); } - return _exec_env->fragment_mgr()->exec_plan_fragment(t_request); } void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* cntl_base, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 18a1667f56dc334..38cd8d28cc00a99 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -121,7 +121,7 @@ class PInternalServiceImpl : public PBackendService { PHandShakeResponse* response, google::protobuf::Closure* done) override; private: - Status _exec_plan_fragment(const std::string& s_request, bool compact); + Status _exec_plan_fragment(const std::string& s_request, PFragmentRequestVersion version, bool compact); Status _fold_constant_expr(const std::string& ser_request, PConstantExprResult* response); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index cff7f2b15444559..e45f384f87b7046 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1046,15 +1046,6 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static boolean enable_local_replica_selection_fallback = false; - - /** - * The timeout of executing async remote fragment. - * In normal case, the async remote fragment will be executed in a short time. If system are under high load - * condition,try to set this timeout longer. - */ - @ConfField(mutable = true) - public static long remote_fragment_exec_timeout_ms = 5000; // 5 sec - /** * The number of query retries. * A query may retry if we encounter RPC exception and no result has been sent to user. diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 5e03947a87671ba..fc6ff6891a668b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -70,6 +70,7 @@ import org.apache.doris.thrift.TErrorTabletInfo; import org.apache.doris.thrift.TEsScanRange; import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TExecPlanFragmentParamsList; import org.apache.doris.thrift.TLoadErrorHubInfo; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloScanRange; @@ -548,14 +549,12 @@ private void sendFragment() throws TException, RpcException, UserException { for (PlanFragment fragment : fragments) { FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId()); - // set up exec states + // 1. set up exec states int instanceNum = params.instanceExecParams.size(); Preconditions.checkState(instanceNum > 0); List tParams = params.toThrift(backendIdx); - List>> futures = - Lists.newArrayList(); - // update memory limit for colocate join + // 2. update memory limit for colocate join if (colocateFragmentIds.contains(fragment.getFragmentId().asInt())) { int rate = Math.min(Config.query_colocate_join_memory_limit_penalty_factor, instanceNum); long newMemory = memoryLimit / rate; @@ -574,7 +573,9 @@ private void sendFragment() throws TException, RpcException, UserException { needCheckBackendState = true; } + // 3. group BackendExecState by BE. So that we can use one RPC to send all fragment instances of a BE. int instanceId = 0; + Map beToExecStates = Maps.newHashMap(); for (TExecPlanFragmentParams tParam : tParams) { BackendExecState execState = new BackendExecState(fragment.getFragmentId(), instanceId++, profileFragmentId, tParam, this.addressToBackendID); @@ -592,18 +593,30 @@ private void sendFragment() throws TException, RpcException, UserException { fragment.getFragmentId().asInt(), jobId); } } - futures.add(Pair.create(execState, execState.execRemoteFragmentAsync())); - backendIdx++; + BackendExecStates states = beToExecStates.get(execState.backend.getId()); + if (states == null) { + states = new BackendExecStates(execState.backend.getId(), execState.brpcAddress); + beToExecStates.putIfAbsent(execState.backend.getId(), states); + } + states.addState(execState); + } + + // 4. send fragments + List>> futures = + Lists.newArrayList(); + for (BackendExecStates states : beToExecStates.values()) { + futures.add(Pair.create(states, states.execRemoteFragmentsAsync())); } - for (Pair> pair : futures) { + // 5. wait send fragment rpc finish + for (Pair> pair : futures) { TStatusCode code; String errMsg = null; Exception exception = null; try { - InternalService.PExecPlanFragmentResult result = pair.second.get(Config.remote_fragment_exec_timeout_ms, - TimeUnit.MILLISECONDS); + InternalService.PExecPlanFragmentResult result = + pair.second.get(queryOptions.query_timeout, TimeUnit.SECONDS); code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (!result.getStatus().getErrorMsgsList().isEmpty()) { errMsg = result.getStatus().getErrorMsgsList().get(0); @@ -612,7 +625,7 @@ private void sendFragment() throws TException, RpcException, UserException { LOG.warn("catch a execute exception", e); exception = e; code = TStatusCode.THRIFT_RPC_ERROR; - BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddress); + BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddr); } catch (InterruptedException e) { LOG.warn("catch a interrupt exception", e); exception = e; @@ -621,7 +634,6 @@ private void sendFragment() throws TException, RpcException, UserException { LOG.warn("catch a timeout exception", e); exception = e; code = TStatusCode.TIMEOUT; - BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddress); } if (code != TStatusCode.OK) { @@ -630,28 +642,26 @@ private void sendFragment() throws TException, RpcException, UserException { } if (errMsg == null) { - errMsg = "exec rpc error. backend id: " + pair.first.backend.getId(); + errMsg = "exec rpc error. backend id: " + pair.first.beId; } queryStatus.setStatus(errMsg); - LOG.warn("exec plan fragment failed, errmsg={}, code: {}, fragmentId={}, backend={}:{}", - errMsg, code, fragment.getFragmentId(), - pair.first.address.hostname, pair.first.address.port); + LOG.warn("exec plan fragment failed, errmsg={}, code: {}, fragmentId={}, backend={}", errMsg, + code, fragment.getFragmentId(), pair.first.brpcAddr); cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR); switch (code) { case TIMEOUT: - throw new RpcException(pair.first.backend.getHost(), "send fragment timeout. backend id: " - + pair.first.backend.getId() + " fragment: " - + DebugUtil.printId(pair.first.rpcParams.params.fragment_instance_id)); + throw new RpcException(pair.first.brpcAddr.hostname, + "send fragment timeout. backend id: " + pair.first.beId); case THRIFT_RPC_ERROR: - SimpleScheduler.addToBlacklist(pair.first.backend.getId(), errMsg); - throw new RpcException(pair.first.backend.getHost(), "rpc failed"); + SimpleScheduler.addToBlacklist(pair.first.beId, errMsg); + throw new RpcException(pair.first.brpcAddr.hostname, "rpc failed"); default: throw new UserException(errMsg); } } // succeed to send the plan fragment, update the "alreadySentBackendIds" - alreadySentBackendIds.add(pair.first.backend.getId()); + alreadySentBackendIds.add(pair.first.beId); } profileFragmentId += 1; @@ -1918,6 +1928,7 @@ public BackendExecState(PlanFragmentId fragmentId, int instanceId, int profileFr FInstanceExecParam fi = fragmentExecParamsMap.get(fragmentId).instanceExecParams.get(instanceId); this.address = fi.host; this.backend = idToBackend.get(addressToBackendID.get(address)); + this.brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); String name = "Instance " + DebugUtil.printId(fi.instanceId) + " (host=" + address + ")"; this.profile = new RuntimeProfile(name); @@ -2016,15 +2027,40 @@ public boolean isBackendStateHealthy() { return true; } - public Future execRemoteFragmentAsync() throws TException, RpcException { - try { - brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); - } catch (Exception e) { - throw new TException(e.getMessage()); - } - this.initiated = true; + public FragmentInstanceInfo buildFragmentInstanceInfo() { + return new QueryStatisticsItem.FragmentInstanceInfo.Builder().instanceId(fragmentInstanceId()) + .fragmentId(String.valueOf(fragmentId)).address(this.address).build(); + } + + private TUniqueId fragmentInstanceId() { + return this.rpcParams.params.getFragmentInstanceId(); + } + } + + /** + * A set of BackendExecState for same Backend + */ + public class BackendExecStates { + long beId; + TNetworkAddress brpcAddr; + List states = Lists.newArrayList(); + + public BackendExecStates(long beId, TNetworkAddress brpcAddr) { + this.beId = beId; + this.brpcAddr = brpcAddr; + } + + public void addState(BackendExecState state) { + this.states.add(state); + } + + public Future execRemoteFragmentsAsync() throws TException { try { - return BackendServiceProxy.getInstance().execPlanFragmentAsync(brpcAddress, rpcParams); + TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList(); + for (BackendExecState state : states) { + paramsList.addToParamsList(state.rpcParams); + } + return BackendServiceProxy.getInstance().execPlanFragmentsAsync(brpcAddr, paramsList); } catch (RpcException e) { // DO NOT throw exception here, return a complete future with error code, // so that the following logic will cancel the fragment. @@ -2046,13 +2082,10 @@ public boolean isDone() { @Override public InternalService.PExecPlanFragmentResult get() { - InternalService.PExecPlanFragmentResult result = InternalService.PExecPlanFragmentResult - .newBuilder() - .setStatus(Types.PStatus.newBuilder() - .addErrorMsgs(e.getMessage()) - .setStatusCode(TStatusCode.THRIFT_RPC_ERROR.getValue()) - .build()) - .build(); + InternalService.PExecPlanFragmentResult result = + InternalService.PExecPlanFragmentResult.newBuilder().setStatus( + Types.PStatus.newBuilder().addErrorMsgs(e.getMessage()).setStatusCode(TStatusCode.THRIFT_RPC_ERROR.getValue()).build()) + .build(); return result; } @@ -2063,16 +2096,6 @@ public InternalService.PExecPlanFragmentResult get(long timeout, TimeUnit unit) }; } } - - public FragmentInstanceInfo buildFragmentInstanceInfo() { - return new QueryStatisticsItem.FragmentInstanceInfo.Builder() - .instanceId(fragmentInstanceId()).fragmentId(String.valueOf(fragmentId)).address(this.address) - .build(); - } - - private TUniqueId fragmentInstanceId() { - return this.rpcParams.params.getFragmentInstanceId(); - } } // execution parameters for a single fragment, diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java index ea0cb6cfda03305..5160769de1722cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java @@ -30,6 +30,7 @@ import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TBrokerRangeDesc; import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TExecPlanFragmentParamsList; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TScanRangeParams; @@ -85,8 +86,10 @@ public void beginTransaction(TStreamLoadPutRequest request) throws UserException txnEntry.setBackend(backend); TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); try { - Future future = BackendServiceProxy.getInstance().execPlanFragmentAsync( - address, tRequest); + TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList(); + paramsList.addToParamsList(tRequest); + Future future = + BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList); InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index f1ad320dcefe432..e49f8d362f04ce0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -20,13 +20,14 @@ import org.apache.doris.common.Config; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.Types; -import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TExecPlanFragmentParamsList; import org.apache.doris.thrift.TFoldConstantParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Maps; import com.google.protobuf.ByteString; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -92,17 +93,20 @@ private BackendServiceClient getProxy(TNetworkAddress address) { } } - public Future execPlanFragmentAsync( - TNetworkAddress address, TExecPlanFragmentParams tRequest) + public Future execPlanFragmentsAsync( + TNetworkAddress address, TExecPlanFragmentParamsList paramsList) throws TException, RpcException { - InternalService.PExecPlanFragmentRequest.Builder builder = InternalService.PExecPlanFragmentRequest.newBuilder(); + InternalService.PExecPlanFragmentRequest.Builder builder = + InternalService.PExecPlanFragmentRequest.newBuilder(); if (Config.use_compact_thrift_rpc) { - builder.setRequest(ByteString.copyFrom(new TSerializer(new TCompactProtocol.Factory()).serialize(tRequest))); + builder.setRequest( + ByteString.copyFrom(new TSerializer(new TCompactProtocol.Factory()).serialize(paramsList))); builder.setCompact(true); } else { - builder.setRequest(ByteString.copyFrom(new TSerializer().serialize(tRequest))).build(); + builder.setRequest(ByteString.copyFrom(new TSerializer().serialize(paramsList))).build(); builder.setCompact(false); } + builder.setVersion(InternalService.PFragmentRequestVersion.VERSION_2); final InternalService.PExecPlanFragmentRequest pRequest = builder.build(); try { diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java index 727644a6a315bf2..509507908780a98 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java @@ -31,6 +31,7 @@ import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TExecPlanFragmentParamsList; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPlanFragmentExecParams; import org.apache.doris.thrift.TStorageMedium; @@ -46,10 +47,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import mockit.Expectations; -import mockit.Mock; -import mockit.MockUp; -import mockit.Mocked; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.Assert; @@ -62,6 +60,10 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; public class CanalSyncDataTest { private static final Logger LOG = LogManager.getLogger(CanalSyncDataTest.class); @@ -252,7 +254,7 @@ public void testNormal(@Mocked GlobalTransactionMgr transactionMgr, minTimes = 0; result = 105L; - backendServiceProxy.execPlanFragmentAsync((TNetworkAddress) any, (TExecPlanFragmentParams) any); + backendServiceProxy.execPlanFragmentsAsync((TNetworkAddress) any, (TExecPlanFragmentParamsList) any); minTimes = 0; result = execFuture; @@ -323,7 +325,7 @@ public void testExecFragmentFail(@Mocked GlobalTransactionMgr transactionMgr, minTimes = 0; result = 105L; - backendServiceProxy.execPlanFragmentAsync((TNetworkAddress) any, (TExecPlanFragmentParams) any); + backendServiceProxy.execPlanFragmentsAsync((TNetworkAddress) any, (TExecPlanFragmentParamsList) any); minTimes = 0; result = execFuture; @@ -389,7 +391,7 @@ public void testCommitTxnFail(@Mocked GlobalTransactionMgr transactionMgr, minTimes = 0; result = 105L; - backendServiceProxy.execPlanFragmentAsync((TNetworkAddress) any, (TExecPlanFragmentParams) any); + backendServiceProxy.execPlanFragmentsAsync((TNetworkAddress) any, (TExecPlanFragmentParamsList) any); minTimes = 0; result = execFuture; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index ae2d1874124df62..7c5e5c27255b8fb 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -163,9 +163,15 @@ message PTabletWriterCancelRequest { message PTabletWriterCancelResult { }; +enum PFragmentRequestVersion { + VERSION_1 = 1; // only one TExecPlanFragmentParams in request + VERSION_2 = 2; // multi TExecPlanFragmentParams in request +}; + message PExecPlanFragmentRequest { optional bytes request = 1; optional bool compact = 2; + optional PFragmentRequestVersion version = 3 [default = VERSION_2]; }; message PExecPlanFragmentResult { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 4787513baff845a..069b23d09efeae9 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -346,6 +346,10 @@ struct TExecPlanFragmentParams { 19: optional TGlobalDict global_dict // scan node could use the global dict to encode the string value to an integer } +struct TExecPlanFragmentParamsList { + 1: optional list paramsList; +} + struct TExecPlanFragmentResult { // required in V1 1: optional Status.TStatus status