Skip to content

Commit

Permalink
Merge pull request #1037 from mytang0/fix_patch_tcp
Browse files Browse the repository at this point in the history
[ISSUE #1035 #1036] Fix patch TCP
close #1035 
close #1036
  • Loading branch information
xwm1992 authored Aug 3, 2022
2 parents 5cb8531 + da5957a commit 48254e8
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,11 @@ public boolean isAvailable(String topic) {
return true;
}

public boolean isRunning() {
if (SessionState.RUNNING != sessionState) {
logger.warn("session is not running, state:{} client:{}", sessionState, client);
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,26 @@ public void retry() {
Command replyCmd = getReplyCmd(header.getCmd());
long sendTime = System.currentTimeMillis();

EventMeshTcpSendResult sendStatus = session.upstreamMsg(header, event,
createSendCallback(replyCmd, taskExecuteTime, event), startTime, taskExecuteTime);
retryTimes++;

if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) {
logger.info("pkg|eventMesh2mq|cmd={}|event={}|user={}|wait={}ms|cost={}ms", header.getCmd(), event,
// check session availability
if (session.isRunning()) {
EventMeshTcpSendResult sendStatus = session.upstreamMsg(header, event,
createSendCallback(replyCmd, taskExecuteTime, event, this), startTime, taskExecuteTime);

if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) {
logger.info("pkg|eventMesh2mq|cmd={}|event={}|user={}|wait={}ms|cost={}ms", header.getCmd(), event,
session.getClient(), taskExecuteTime - startTime, sendTime - startTime);
} else {
throw new Exception(sendStatus.getDetail());
} else {
throw new Exception(sendStatus.getDetail());
}
}
} catch (Exception e) {
logger.error("TCP UpstreamMsg Retry error", e);
}
}

protected SendCallback createSendCallback(Command replyCmd, long taskExecuteTime, CloudEvent event) {
protected SendCallback createSendCallback(Command replyCmd, long taskExecuteTime, CloudEvent event, UpStreamMsgContext retryContext) {
final long createTime = System.currentTimeMillis();
Package msg = new Package();

Expand All @@ -129,10 +134,9 @@ public void onException(OnExceptionContext context) {
session.getSender().getUpstreamBuff().release();

// retry
UpStreamMsgContext upStreamMsgContext = new UpStreamMsgContext(
session, event, header, startTime, taskExecuteTime);
upStreamMsgContext.delay(10000);
session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(upStreamMsgContext);
// reset delay time
retryContext.delay(10000);
session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(retryContext);

session.getSender().failMsgCount.incrementAndGet();
logger.error("upstreamMsg mq message error|user={}|callback cost={}, errMsg={}", session.getClient(),
Expand Down

0 comments on commit 48254e8

Please sign in to comment.