diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java index 028214d865..7ef1b8f348 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java @@ -67,7 +67,21 @@ public void run() { throw new Exception("eventMeshMessage is null"); } - if (!cmd.equals(RESPONSE_TO_SERVER) && !eventMeshTCPServer.getRateLimiter().tryAcquire(TRY_PERMIT_TIME_OUT, TimeUnit.MILLISECONDS)) { + if (eventMeshTCPServer.getRateLimiter().tryAcquire(TRY_PERMIT_TIME_OUT, TimeUnit.MILLISECONDS)) { + synchronized (session) { + long sendTime = System.currentTimeMillis(); + addTimestamp(eventMeshMessage, cmd, sendTime); + + sendStatus = session.upstreamMsg(pkg.getHeader(), EventMeshUtil.decodeMessage(eventMeshMessage), createSendCallback(replyCmd, taskExecuteTime, eventMeshMessage), startTime, taskExecuteTime); + + if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) { + messageLogger.info("pkg|eventMesh2mq|cmd={}|Msg={}|user={}|wait={}ms|cost={}ms", cmd, EventMeshUtil.printMqMessage + (eventMeshMessage), session.getClient(), taskExecuteTime - startTime, sendTime - startTime); + } else { + throw new Exception(sendStatus.getDetail()); + } + } + }else{ msg.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode(), "Tps overload, global flow control", pkg.getHeader().getSeq())); ctx.writeAndFlush(msg).addListener( new ChannelFutureListener() { @@ -80,20 +94,6 @@ public void operationComplete(ChannelFuture future) throws Exception { logger.warn("======Tps overload, global flow control, rate:{}! PLEASE CHECK!========", eventMeshTCPServer.getRateLimiter().getRate()); return; } - - synchronized (session) { - long sendTime = System.currentTimeMillis(); - addTimestamp(eventMeshMessage, cmd, sendTime); - - sendStatus = session.upstreamMsg(pkg.getHeader(), EventMeshUtil.decodeMessage(eventMeshMessage), createSendCallback(replyCmd, taskExecuteTime, eventMeshMessage), startTime, taskExecuteTime); - - if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) { - messageLogger.info("pkg|eventMesh2mq|cmd={}|Msg={}|user={}|wait={}ms|cost={}ms", cmd, EventMeshUtil.printMqMessage - (eventMeshMessage), session.getClient(), taskExecuteTime - startTime, sendTime - startTime); - } else { - throw new Exception(sendStatus.getDetail()); - } - } } catch (Exception e) { logger.error("MessageTransferTask failed|cmd={}|Msg={}|user={}|errMsg={}", cmd, eventMeshMessage, session.getClient(), e); if (!cmd.equals(RESPONSE_TO_SERVER)) {