Skip to content

Commit

Permalink
[ISSUE #457] message ack problem in retry or repush by closeSession (#…
Browse files Browse the repository at this point in the history
…459)

* modify:optimize flow control in downstreaming msg

* modify:optimize stategy of selecting session in downstream msg

* modify:optimize msg downstream,msg store in session

* modify:fix bug:not a @sharable handler

* modify:downstream broadcast msg asynchronously

* modify:remove unneccessary interface in eventmesh-connector-api

* modify:fix conflict

* modify:add license in EventMeshAction

* modify:fix ack problem

close #457
  • Loading branch information
lrhkobe authored Jul 28, 2021
1 parent 634262c commit 21cca7c
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ private void handleUnackMsgsInSession(Session session) {
, session.getClientGroupWrapper().get().groupConsumerSessions);
if(reChooseSession != null){
downStreamMsgContext.session = reChooseSession;
reChooseSession.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
reChooseSession.downstreamMsg(downStreamMsgContext);
logger.info("rePush msg form unAckMsgs,seq:{},rePushClient:{}", entry.getKey(), downStreamMsgContext.session.getClient());
}else{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ private void retryHandle(DownStreamMsgContext downStreamMsgContext) {
// }
} else {
downStreamMsgContext.session = rechoosen;
rechoosen.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
rechoosen.downstreamMsg(downStreamMsgContext);
logger.info("retry downStream msg end,seq:{},retryTimes:{},bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ public void run() {
downStreamMsgContext.ackMsg();
session.getPusher().getUnAckMsg().remove(seq);
}else {
logger.warn("MessageAckTask, seq:{}, downStreamMsgContext not in downStreamMap,client:{}", seq, session.getClient());
if(!cmd.equals(Command.RESPONSE_TO_CLIENT_ACK)) {
logger.warn("MessageAckTask, seq:{}, downStreamMsgContext not in downStreamMap,client:{}", seq, session.getClient());
}
}
messageLogger.info("pkg|c2eventMesh|cmd={}|seq=[{}]|user={}|wait={}ms|cost={}ms", cmd, seq, session.getClient(),
taskExecuteTime - startTime, System.currentTimeMillis() - startTime);
Expand Down

0 comments on commit 21cca7c

Please sign in to comment.