From 1e637f08543d75e5a94a535944d29901fda7c9fb Mon Sep 17 00:00:00 2001 From: lrhkobe <34571087+lrhkobe@users.noreply.github.com> Date: Wed, 28 Jul 2021 21:18:15 +0800 Subject: [PATCH] [ISSUE #461] fix global flow control problem (#469) * modify:optimize flow control in downstreaming msg * modify:optimize stategy of selecting session in downstream msg * modify:optimize msg downstream,msg store in session * modify:fix bug:not a @Sharable handler * modify:downstream broadcast msg asynchronously * modify:remove unneccessary interface in eventmesh-connector-api * modify:fix conflict * modify:add license in EventMeshAction * modify:fix global flow control problem close #461 --- .../tcp/client/task/MessageTransferTask.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java index 028214d865..7ef1b8f348 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java @@ -67,7 +67,21 @@ public void run() { throw new Exception("eventMeshMessage is null"); } - if (!cmd.equals(RESPONSE_TO_SERVER) && !eventMeshTCPServer.getRateLimiter().tryAcquire(TRY_PERMIT_TIME_OUT, TimeUnit.MILLISECONDS)) { + if (eventMeshTCPServer.getRateLimiter().tryAcquire(TRY_PERMIT_TIME_OUT, TimeUnit.MILLISECONDS)) { + synchronized (session) { + long sendTime = System.currentTimeMillis(); + addTimestamp(eventMeshMessage, cmd, sendTime); + + sendStatus = session.upstreamMsg(pkg.getHeader(), EventMeshUtil.decodeMessage(eventMeshMessage), createSendCallback(replyCmd, taskExecuteTime, eventMeshMessage), startTime, taskExecuteTime); + + if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) { + messageLogger.info("pkg|eventMesh2mq|cmd={}|Msg={}|user={}|wait={}ms|cost={}ms", cmd, EventMeshUtil.printMqMessage + (eventMeshMessage), session.getClient(), taskExecuteTime - startTime, sendTime - startTime); + } else { + throw new Exception(sendStatus.getDetail()); + } + } + }else{ msg.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode(), "Tps overload, global flow control", pkg.getHeader().getSeq())); ctx.writeAndFlush(msg).addListener( new ChannelFutureListener() { @@ -80,20 +94,6 @@ public void operationComplete(ChannelFuture future) throws Exception { logger.warn("======Tps overload, global flow control, rate:{}! PLEASE CHECK!========", eventMeshTCPServer.getRateLimiter().getRate()); return; } - - synchronized (session) { - long sendTime = System.currentTimeMillis(); - addTimestamp(eventMeshMessage, cmd, sendTime); - - sendStatus = session.upstreamMsg(pkg.getHeader(), EventMeshUtil.decodeMessage(eventMeshMessage), createSendCallback(replyCmd, taskExecuteTime, eventMeshMessage), startTime, taskExecuteTime); - - if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) { - messageLogger.info("pkg|eventMesh2mq|cmd={}|Msg={}|user={}|wait={}ms|cost={}ms", cmd, EventMeshUtil.printMqMessage - (eventMeshMessage), session.getClient(), taskExecuteTime - startTime, sendTime - startTime); - } else { - throw new Exception(sendStatus.getDetail()); - } - } } catch (Exception e) { logger.error("MessageTransferTask failed|cmd={}|Msg={}|user={}|errMsg={}", cmd, eventMeshMessage, session.getClient(), e); if (!cmd.equals(RESPONSE_TO_SERVER)) {