Skip to content

Commit

Permalink
[ISSUE #461] fix global flow control problem (#469)
Browse files Browse the repository at this point in the history
* modify:optimize flow control in downstreaming msg

* modify:optimize stategy of selecting session in downstream msg

* modify:optimize msg downstream,msg store in session

* modify:fix bug:not a @sharable handler

* modify:downstream broadcast msg asynchronously

* modify:remove unneccessary interface in eventmesh-connector-api

* modify:fix conflict

* modify:add license in EventMeshAction

* modify:fix global flow control problem

close #461
  • Loading branch information
lrhkobe authored Jul 28, 2021
1 parent 21cca7c commit 1e637f0
Showing 1 changed file with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,21 @@ public void run() {
throw new Exception("eventMeshMessage is null");
}

if (!cmd.equals(RESPONSE_TO_SERVER) && !eventMeshTCPServer.getRateLimiter().tryAcquire(TRY_PERMIT_TIME_OUT, TimeUnit.MILLISECONDS)) {
if (eventMeshTCPServer.getRateLimiter().tryAcquire(TRY_PERMIT_TIME_OUT, TimeUnit.MILLISECONDS)) {
synchronized (session) {
long sendTime = System.currentTimeMillis();
addTimestamp(eventMeshMessage, cmd, sendTime);

sendStatus = session.upstreamMsg(pkg.getHeader(), EventMeshUtil.decodeMessage(eventMeshMessage), createSendCallback(replyCmd, taskExecuteTime, eventMeshMessage), startTime, taskExecuteTime);

if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) {
messageLogger.info("pkg|eventMesh2mq|cmd={}|Msg={}|user={}|wait={}ms|cost={}ms", cmd, EventMeshUtil.printMqMessage
(eventMeshMessage), session.getClient(), taskExecuteTime - startTime, sendTime - startTime);
} else {
throw new Exception(sendStatus.getDetail());
}
}
}else{
msg.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode(), "Tps overload, global flow control", pkg.getHeader().getSeq()));
ctx.writeAndFlush(msg).addListener(
new ChannelFutureListener() {
Expand All @@ -80,20 +94,6 @@ public void operationComplete(ChannelFuture future) throws Exception {
logger.warn("======Tps overload, global flow control, rate:{}! PLEASE CHECK!========", eventMeshTCPServer.getRateLimiter().getRate());
return;
}

synchronized (session) {
long sendTime = System.currentTimeMillis();
addTimestamp(eventMeshMessage, cmd, sendTime);

sendStatus = session.upstreamMsg(pkg.getHeader(), EventMeshUtil.decodeMessage(eventMeshMessage), createSendCallback(replyCmd, taskExecuteTime, eventMeshMessage), startTime, taskExecuteTime);

if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) {
messageLogger.info("pkg|eventMesh2mq|cmd={}|Msg={}|user={}|wait={}ms|cost={}ms", cmd, EventMeshUtil.printMqMessage
(eventMeshMessage), session.getClient(), taskExecuteTime - startTime, sendTime - startTime);
} else {
throw new Exception(sendStatus.getDetail());
}
}
} catch (Exception e) {
logger.error("MessageTransferTask failed|cmd={}|Msg={}|user={}|errMsg={}", cmd, eventMeshMessage, session.getClient(), e);
if (!cmd.equals(RESPONSE_TO_SERVER)) {
Expand Down

0 comments on commit 1e637f0

Please sign in to comment.