diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java index 4620ce7874..6cbb130caf 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java @@ -44,7 +44,7 @@ import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping; import org.apache.eventmesh.runtime.core.protocol.tcp.client.rebalance.EventMeshRebalanceService; import org.apache.eventmesh.runtime.core.protocol.tcp.client.rebalance.EventmeshRebalanceImpl; -import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.retry.EventMeshTcpRetryer; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.EventMeshTcpRetryer; import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor; import org.apache.eventmesh.runtime.registry.Registry; import org.apache.eventmesh.runtime.util.EventMeshThreadFactoryImpl; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java index d64ef6bcec..3649dd0e35 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java @@ -245,6 +245,8 @@ public void onException(OnExceptionContext context) { SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(context.getException(), 2))); asyncContext.onComplete(err, handler); + + eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000)); long endTime = System.currentTimeMillis(); eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed(); eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime); @@ -261,6 +263,8 @@ public void onException(OnExceptionContext context) { SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(ex, 2))); asyncContext.onComplete(err); + + eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000)); long endTime = System.currentTimeMillis(); messageLogger.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", endTime - startTime, diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java index 5aa623d125..38f3f25ea6 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java @@ -254,6 +254,8 @@ public void onException(Throwable e) { SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2))); asyncContext.onComplete(err, handler); + + eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000)); messageLogger.error("message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", System.currentTimeMillis() - startTime, sendMessageRequestBody.getTopic(), @@ -267,6 +269,8 @@ public void onException(Throwable e) { SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(ex, 2))); asyncContext.onComplete(err); + + eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000)); long endTime = System.currentTimeMillis(); eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed(); eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java index 311d6ebd85..87f7b9b4a0 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java @@ -17,26 +17,8 @@ package org.apache.eventmesh.runtime.core.protocol.tcp.client.group; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - import com.alibaba.fastjson.JSON; - -import io.openmessaging.api.AsyncConsumeContext; -import io.openmessaging.api.AsyncMessageListener; -import io.openmessaging.api.Message; -import io.openmessaging.api.OnExceptionContext; -import io.openmessaging.api.SendCallback; -import io.openmessaging.api.SendResult; - +import io.openmessaging.api.*; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.api.EventMeshAction; @@ -53,7 +35,7 @@ import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.dispatch.DownstreamDispatchStrategy; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext; -import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.retry.EventMeshTcpRetryer; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.EventMeshTcpRetryer; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.UpStreamMsgContext; import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor; import org.apache.eventmesh.runtime.util.EventMeshUtil; @@ -61,6 +43,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + public class ClientGroupWrapper { public static Logger logger = LoggerFactory.getLogger(ClientGroupWrapper.class); @@ -752,7 +740,9 @@ private void sendMsgBackToBroker(Message msg, String bizSeqNo) throws Exception String topic = msg.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION); logger.warn("send msg back to broker, bizSeqno:{}, topic:{}", bizSeqNo, topic); - send(new UpStreamMsgContext(null, null, msg), new SendCallback() { + long startTime = System.currentTimeMillis(); + long taskExcuteTime = startTime; + send(new UpStreamMsgContext(null, msg, null, startTime, taskExcuteTime), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { logger.info("consumerGroup:{} consume fail, sendMessageBack success, bizSeqno:{}, topic:{}", consumerGroup, bizSeqNo, topic); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java index 712e6fac46..5b86ba3879 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java @@ -17,45 +17,36 @@ package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Delayed; -import java.util.concurrent.TimeUnit; - import io.openmessaging.api.Message; - import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.eventmesh.api.AbstractContext; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.SubscriptionItem; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.RetryContext; +import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.runtime.util.ServerGlobal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; -public class DownStreamMsgContext implements Delayed { +public class DownStreamMsgContext extends RetryContext { private final Logger logger = LoggerFactory.getLogger(this.getClass()); - public String seq; - - public Message msgExt; - public Session session; public AbstractContext consumeConcurrentlyContext; public MQConsumerWrapper consumer; - public int retryTimes; - public SubscriptionItem subscriptionItem; - private long executeTime; - public long lastPushTime; private long createTime; @@ -68,11 +59,9 @@ public DownStreamMsgContext(Message msgExt, Session session, MQConsumerWrapper c this.seq = String.valueOf(ServerGlobal.getInstance().getMsgCounter().incrementAndGet()); this.msgExt = msgExt; this.session = session; - this.retryTimes = 0; this.consumer = consumer; this.consumeConcurrentlyContext = consumeConcurrentlyContext; this.lastPushTime = System.currentTimeMillis(); - this.executeTime = System.currentTimeMillis(); this.createTime = System.currentTimeMillis(); this.subscriptionItem = subscriptionItem; String ttlStr = msgExt.getUserProperties("TTL"); @@ -99,10 +88,6 @@ public void ackMsg() { } } - public void delay(long delay) { - this.executeTime = System.currentTimeMillis() + (retryTimes + 1) * delay; - } - @Override public String toString() { return "DownStreamMsgContext{" + @@ -119,19 +104,71 @@ public String toString() { } @Override - public int compareTo(Delayed delayed) { - DownStreamMsgContext context = (DownStreamMsgContext) delayed; - if (this.executeTime > context.executeTime) { - return 1; - } else if (this.executeTime == context.executeTime) { - return 0; - } else { - return -1; + public void retry() { + try { + logger.info("retry downStream msg start,seq:{},retryTimes:{},bizSeq:{}", this.seq, this.retryTimes, EventMeshUtil.getMessageBizSeq(this.msgExt)); + + if (isRetryMsgTimeout(this)) { + return; + } + this.retryTimes++; + this.lastPushTime = System.currentTimeMillis(); + + Session rechoosen = null; + String topic = this.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION); + if (!SubscriptionMode.BROADCASTING.equals(this.subscriptionItem.getMode())) { + rechoosen = this.session.getClientGroupWrapper() + .get().getDownstreamDispatchStrategy().select(this.session.getClientGroupWrapper().get().getSysId() + , topic + , this.session.getClientGroupWrapper().get().getGroupConsumerSessions()); + } else { + rechoosen = this.session; + } + + if (rechoosen == null) { + logger.warn("retry, found no session to downstream msg,seq:{}, retryTimes:{}, bizSeq:{}", this.seq, this.retryTimes, EventMeshUtil.getMessageBizSeq(this.msgExt)); + } else { + this.session = rechoosen; + rechoosen.downstreamMsg(this); + logger.info("retry downStream msg end,seq:{},retryTimes:{},bizSeq:{}", this.seq, this.retryTimes, EventMeshUtil.getMessageBizSeq(this.msgExt)); + } + } catch (Exception e) { + logger.error("retry-dispatcher error!", e); } } - @Override - public long getDelay(TimeUnit unit) { - return unit.convert(this.executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + private boolean isRetryMsgTimeout(DownStreamMsgContext downStreamMsgContext) { + boolean flag = false; + String ttlStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL); + long ttl = StringUtils.isNumeric(ttlStr)? Long.parseLong(ttlStr) : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS;; + + String storeTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.STORE_TIME); + long storeTimestamp = StringUtils.isNumeric(storeTimeStr)? Long.parseLong(storeTimeStr) : 0; + String leaveTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.LEAVE_TIME); + long brokerCost = StringUtils.isNumeric(leaveTimeStr) ? Long.parseLong(leaveTimeStr) - storeTimestamp : 0; + + String arriveTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.ARRIVE_TIME); + long accessCost = StringUtils.isNumeric(arriveTimeStr) ? System.currentTimeMillis() - Long.parseLong(arriveTimeStr) : 0; + double elapseTime = brokerCost + accessCost; + if (elapseTime >= ttl) { + logger.warn("discard the retry because timeout, seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)); + flag = true; + eventMeshAckMsg(downStreamMsgContext); + } + return flag; + } + + /** + * eventMesh ack msg + * + * @param downStreamMsgContext + */ + private void eventMeshAckMsg(DownStreamMsgContext downStreamMsgContext) { + List msgExts = new ArrayList(); + msgExts.add(downStreamMsgContext.msgExt); + logger.warn("eventMeshAckMsg topic:{}, seq:{}, bizSeq:{}", downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION), + downStreamMsgContext.seq, downStreamMsgContext.msgExt.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_KEYS)); + downStreamMsgContext.consumer.updateOffset(msgExts, downStreamMsgContext.consumeConcurrentlyContext); } + } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java deleted file mode 100644 index cf4e3b7ab5..0000000000 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.retry; - -import io.openmessaging.api.Message; -import org.apache.commons.lang3.StringUtils; -import org.apache.eventmesh.common.Constants; -import org.apache.eventmesh.common.protocol.SubscriptionType; -import org.apache.eventmesh.common.protocol.SubscriptionMode; -import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; -import org.apache.eventmesh.runtime.constants.EventMeshConstants; -import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; -import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext; -import org.apache.eventmesh.runtime.util.EventMeshThreadFactoryImpl; -import org.apache.eventmesh.runtime.util.EventMeshUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.DelayQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -public class EventMeshTcpRetryer { - - public static Logger logger = LoggerFactory.getLogger(EventMeshTcpRetryer.class); - - private EventMeshTCPServer eventMeshTCPServer; - - private DelayQueue retrys = new DelayQueue(); - - private ThreadPoolExecutor pool = new ThreadPoolExecutor(3, - 3, - 60000, - TimeUnit.MILLISECONDS, new ArrayBlockingQueue(1000), - new EventMeshThreadFactoryImpl("eventMesh-tcp-retry", true), - new ThreadPoolExecutor.AbortPolicy()); - - private Thread dispatcher; - - public EventMeshTcpRetryer(EventMeshTCPServer eventMeshTCPServer) { - this.eventMeshTCPServer = eventMeshTCPServer; - } - - public EventMeshTCPServer getEventMeshTCPServer() { - return eventMeshTCPServer; - } - - public void setEventMeshTCPServer(EventMeshTCPServer eventMeshTCPServer) { - this.eventMeshTCPServer = eventMeshTCPServer; - } - - public void pushRetry(DownStreamMsgContext downStreamMsgContext) { - if (retrys.size() >= eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryQueueSize) { - logger.error("pushRetry fail,retrys is too much,allow max retryQueueSize:{}, retryTimes:{}, seq:{}, bizSeq:{}", - eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryQueueSize, downStreamMsgContext.retryTimes, - downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)); - return; - } - - int maxRetryTimes = SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) - ? eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgSyncRetryTimes - : eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgAsyncRetryTimes; - if (downStreamMsgContext.retryTimes >= maxRetryTimes) { - logger.warn("pushRetry fail,retry over maxRetryTimes:{}, pushType: {}, retryTimes:{}, seq:{}, bizSeq:{}", maxRetryTimes, downStreamMsgContext.subscriptionItem.getType(), - downStreamMsgContext.retryTimes, downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)); - return; - } - - retrys.offer(downStreamMsgContext); - logger.info("pushRetry success,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.retryTimes, - EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)); - } - - public void init() { - dispatcher = new Thread(new Runnable() { - @Override - public void run() { - try { - DownStreamMsgContext downStreamMsgContext = null; - while ((downStreamMsgContext = retrys.take()) != null) { - final DownStreamMsgContext finalDownStreamMsgContext = downStreamMsgContext; - pool.execute(() -> { - retryHandle(finalDownStreamMsgContext); - }); - } - } catch (Exception e) { - logger.error("retry-dispatcher error!", e); - } - } - }, "retry-dispatcher"); - dispatcher.setDaemon(true); - logger.info("EventMeshTcpRetryer inited......"); - } - - private void retryHandle(DownStreamMsgContext downStreamMsgContext) { - try { - logger.info("retry downStream msg start,seq:{},retryTimes:{},bizSeq:{}", downStreamMsgContext.seq, - downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)); - - if (isRetryMsgTimeout(downStreamMsgContext)) { - return; - } - downStreamMsgContext.retryTimes++; - downStreamMsgContext.lastPushTime = System.currentTimeMillis(); - - Session rechoosen = null; - String topic = downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION); - if (!SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) { - rechoosen = downStreamMsgContext.session.getClientGroupWrapper() - .get().getDownstreamDispatchStrategy().select(downStreamMsgContext.session.getClientGroupWrapper().get().getSysId() - , topic - , downStreamMsgContext.session.getClientGroupWrapper().get().getGroupConsumerSessions()); - } else { - rechoosen = downStreamMsgContext.session; - } - - if (rechoosen == null) { - logger.warn("retry, found no session to downstream msg,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, - downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)); - -// //Need to manually ack the message that did not send a successful message -// eventMeshAckMsg(downStreamMsgContext); - -// //Retry cannot find the delivered session, no longer post back to the broker or retry other event Mesh -// String bizSeqNo = finalDownStreamMsgContext.msgExt.getKeys(); -// String uniqueId = MapUtils.getString(finalDownStreamMsgContext.msgExt.getProperties(), WeMQConstant.RMB_UNIQ_ID, ""); -// if(EventMeshTCPServer.getAccessConfiguration().eventMeshTcpSendBackEnabled){ -// sendMsgBackToBroker(finalDownStreamMsgContext.msgExt, bizSeqNo, uniqueId); -// }else{ -// // TODO: Push the message to other EventMesh instances. To be determined. -// sendMsgToOtherEventMesh(finalDownStreamMsgContext.msgExt, bizSeqNo, uniqueId); -// } - } else { - downStreamMsgContext.session = rechoosen; - rechoosen.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext); - rechoosen.downstreamMsg(downStreamMsgContext); - logger.info("retry downStream msg end,seq:{},retryTimes:{},bizSeq:{}", downStreamMsgContext.seq, - downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)); - } - } catch (Exception e) { - logger.error("retry-dispatcher error!", e); - } - } - - private boolean isRetryMsgTimeout(DownStreamMsgContext downStreamMsgContext) { - boolean flag = false; - String ttlStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL); - long ttl = StringUtils.isNumeric(ttlStr)? Long.parseLong(ttlStr) : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS;; - - String storeTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.STORE_TIME); - long storeTimestamp = StringUtils.isNumeric(storeTimeStr)? Long.parseLong(storeTimeStr) : 0; - String leaveTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.LEAVE_TIME); - long brokerCost = StringUtils.isNumeric(leaveTimeStr) ? Long.parseLong(leaveTimeStr) - storeTimestamp : 0; - - String arriveTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.ARRIVE_TIME); - long accessCost = StringUtils.isNumeric(arriveTimeStr) ? System.currentTimeMillis() - Long.parseLong(arriveTimeStr) : 0; - double elapseTime = brokerCost + accessCost; - if (elapseTime >= ttl) { - logger.warn("discard the retry because timeout, seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, - downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)); - flag = true; - eventMeshAckMsg(downStreamMsgContext); - } - return flag; - } - - public void start() throws Exception { - dispatcher.start(); - logger.info("EventMeshTcpRetryer started......"); - } - - public void shutdown() { - pool.shutdown(); - logger.info("EventMeshTcpRetryer shutdown......"); - } - - public int getRetrySize() { - return retrys.size(); - } - - /** - * eventMesh ack msg - * - * @param downStreamMsgContext - */ - private void eventMeshAckMsg(DownStreamMsgContext downStreamMsgContext) { - List msgExts = new ArrayList(); - msgExts.add(downStreamMsgContext.msgExt); - logger.warn("eventMeshAckMsg topic:{}, seq:{}, bizSeq:{}", downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION), - downStreamMsgContext.seq, downStreamMsgContext.msgExt.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_KEYS)); - downStreamMsgContext.consumer.updateOffset(msgExts, downStreamMsgContext.consumeConcurrentlyContext); -// ConsumeMessageService consumeMessageService = downStreamMsgContext.consumer.getDefaultMQPushConsumer().getDefaultMQPushConsumerImpl().getConsumeMessageService(); -// ((ConsumeMessageConcurrentlyService)consumeMessageService).updateOffset(msgExts, downStreamMsgContext.consumeConcurrentlyContext); - } - - public void printRetryThreadPoolState() { -// ThreadPoolHelper.printState(pool); - } -} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java new file mode 100644 index 0000000000..c43ef4ffe2 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry; + +import org.apache.eventmesh.common.protocol.SubscriptionType; +import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext; +import org.apache.eventmesh.runtime.util.EventMeshThreadFactoryImpl; +import org.apache.eventmesh.runtime.util.EventMeshUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class EventMeshTcpRetryer { + + public static Logger logger = LoggerFactory.getLogger(EventMeshTcpRetryer.class); + + private EventMeshTCPServer eventMeshTCPServer; + + private DelayQueue retrys = new DelayQueue(); + + private ThreadPoolExecutor pool = new ThreadPoolExecutor(3, + 3, + 60000, + TimeUnit.MILLISECONDS, new ArrayBlockingQueue(1000), + new EventMeshThreadFactoryImpl("eventMesh-tcp-retry", true), + new ThreadPoolExecutor.AbortPolicy()); + + private Thread dispatcher; + + public EventMeshTcpRetryer(EventMeshTCPServer eventMeshTCPServer) { + this.eventMeshTCPServer = eventMeshTCPServer; + } + + public EventMeshTCPServer getEventMeshTCPServer() { + return eventMeshTCPServer; + } + + public void setEventMeshTCPServer(EventMeshTCPServer eventMeshTCPServer) { + this.eventMeshTCPServer = eventMeshTCPServer; + } + + public void pushRetry(RetryContext retryContext) { + if (retrys.size() >= eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryQueueSize) { + logger.error("pushRetry fail,retrys is too much,allow max retryQueueSize:{}, retryTimes:{}, seq:{}, bizSeq:{}", + eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryQueueSize, retryContext.retryTimes, + retryContext.seq, EventMeshUtil.getMessageBizSeq(retryContext.msgExt)); + return; + } + + int maxRetryTimes = eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgAsyncRetryTimes; + if (retryContext instanceof DownStreamMsgContext) { + DownStreamMsgContext downStreamMsgContext = (DownStreamMsgContext) retryContext; + maxRetryTimes = SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) ? + eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgSyncRetryTimes : + eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgAsyncRetryTimes; + } + + if (retryContext.retryTimes >= maxRetryTimes) { + logger.warn("pushRetry fail,retry over maxRetryTimes:{}, retryTimes:{}, seq:{}, bizSeq:{}", maxRetryTimes, + retryContext.retryTimes, retryContext.seq, EventMeshUtil.getMessageBizSeq(retryContext.msgExt)); + return; + } + + retrys.offer(retryContext); + logger.info("pushRetry success,seq:{}, retryTimes:{}, bizSeq:{}", retryContext.seq, retryContext.retryTimes, + EventMeshUtil.getMessageBizSeq(retryContext.msgExt)); + } + + public void init() { + dispatcher = new Thread(new Runnable() { + @Override + public void run() { + try { + RetryContext retryContext = null; + while ((retryContext = retrys.take()) != null) { + final RetryContext finalRetryContext = retryContext; + pool.execute(() -> { + finalRetryContext.retry(); + }); + } + } catch (Exception e) { + logger.error("retry-dispatcher error!", e); + } + } + }, "retry-dispatcher"); + dispatcher.setDaemon(true); + logger.info("EventMeshTcpRetryer inited......"); + } + + public void start() throws Exception { + dispatcher.start(); + logger.info("EventMeshTcpRetryer started......"); + } + + public void shutdown() { + pool.shutdown(); + logger.info("EventMeshTcpRetryer shutdown......"); + } + + public int getRetrySize() { + return retrys.size(); + } + + public void printRetryThreadPoolState() { +// ThreadPoolHelper.printState(pool); + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/RetryContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/RetryContext.java new file mode 100644 index 0000000000..04d8674d7f --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/RetryContext.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry; + +import io.openmessaging.api.Message; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +public abstract class RetryContext implements Delayed { + + public Message msgExt; + + public String seq; + + public int retryTimes = 0; + + public long executeTime = System.currentTimeMillis(); + + public RetryContext delay(long delay) { + this.executeTime = System.currentTimeMillis() + (retryTimes + 1) * delay; + return this; + } + + @Override + public int compareTo(Delayed delayed) { + RetryContext obj = (RetryContext) delayed; + if (this.executeTime > obj.executeTime) { + return 1; + } else if (this.executeTime == obj.executeTime) { + return 0; + } else { + return -1; + } + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(this.executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + abstract public void retry(); +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java index 6378fa2fb7..e9e1fc53a7 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java @@ -17,13 +17,8 @@ package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - import io.openmessaging.api.Message; import io.openmessaging.api.SendCallback; - import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.eventmesh.api.RRCallback; @@ -39,6 +34,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + public class SessionSender { private final Logger messageLogger = LoggerFactory.getLogger("message"); @@ -81,7 +80,7 @@ public EventMeshTcpSendResult send(Header header, Message msg, SendCallback send Command cmd = header.getCommand(); if (Command.REQUEST_TO_SERVER == cmd) { long ttl = msg.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL) != null ? Long.parseLong(msg.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL)) : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS; - upStreamMsgContext = new UpStreamMsgContext(header.getSeq(), session, msg); + upStreamMsgContext = new UpStreamMsgContext(session, msg, header, startTime, taskExecuteTime); session.getClientGroupWrapper().get().request(upStreamMsgContext, initSyncRRCallback(header, startTime, taskExecuteTime), ttl); upstreamBuff.release(); } else if (Command.RESPONSE_TO_SERVER == cmd) { @@ -98,11 +97,11 @@ public EventMeshTcpSendResult send(Header header, Message msg, SendCallback send // MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, msg.getProperty(DeFiBusConstant.PROPERTY_RR_REQUEST_ID)); // MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, msg.getProperty(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO)); - upStreamMsgContext = new UpStreamMsgContext(header.getSeq(), session, msg); + upStreamMsgContext = new UpStreamMsgContext(session, msg, header, startTime, taskExecuteTime); session.getClientGroupWrapper().get().reply(upStreamMsgContext); upstreamBuff.release(); } else { - upStreamMsgContext = new UpStreamMsgContext(header.getSeq(), session, msg); + upStreamMsgContext = new UpStreamMsgContext(session, msg, header, startTime, taskExecuteTime); session.getClientGroupWrapper().get().send(upStreamMsgContext, sendCallback); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java index 1618ce72da..48be15c936 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java @@ -18,26 +18,43 @@ package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send; import io.openmessaging.api.Message; - +import io.openmessaging.api.OnExceptionContext; +import io.openmessaging.api.SendCallback; +import io.openmessaging.api.SendResult; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.protocol.tcp.*; +import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.RetryContext; +import org.apache.eventmesh.runtime.util.EventMeshUtil; +import org.apache.eventmesh.runtime.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UpStreamMsgContext extends RetryContext { -public class UpStreamMsgContext { + private final Logger logger = LoggerFactory.getLogger(this.getClass()); private Session session; - private Message msg; + private long createTime = System.currentTimeMillis(); - private String seq; + private Header header; - private long createTime = System.currentTimeMillis(); + private long startTime; + + private long taskExecuteTime; - public UpStreamMsgContext(String seq, Session session, Message msg) { - this.seq = seq; + public UpStreamMsgContext(Session session, Message msg, Header header, long startTime, long taskExecuteTime) { + this.seq = header.getSeq(); this.session = session; - this.msg = msg; + this.msgExt = msg; + this.header = header; + this.startTime = startTime; + this.taskExecuteTime = taskExecuteTime; } public Session getSession() { @@ -45,7 +62,7 @@ public Session getSession() { } public Message getMsg() { - return msg; + return msgExt; } public long getCreateTime() { @@ -55,8 +72,84 @@ public long getCreateTime() { @Override public String toString() { return "UpStreamMsgContext{seq=" + seq - + ",topic=" + msg.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION) + + ",topic=" + msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION) + ",client=" + session.getClient() - + ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT) + "}"; + + ",retryTimes=" + retryTimes + + ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT) + "}" + + ",executeTime=" + DateFormatUtils.format(executeTime, EventMeshConstants.DATE_FORMAT); + } + + @Override + public void retry() { + logger.info("retry upStream msg start,seq:{},retryTimes:{},bizSeq:{}", this.seq, this.retryTimes, EventMeshUtil.getMessageBizSeq(this.msgExt)); + + try { + Command replyCmd = getReplyCmd(header.getCommand()); + long sendTime = System.currentTimeMillis(); + EventMeshMessage eventMeshMessage = EventMeshUtil.encodeMessage(msgExt); + EventMeshTcpSendResult sendStatus = session.upstreamMsg(header, msgExt, + createSendCallback(replyCmd, taskExecuteTime, eventMeshMessage), startTime, taskExecuteTime); + + if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) { + logger.info("pkg|eventMesh2mq|cmd={}|Msg={}|user={}|wait={}ms|cost={}ms", header.getCommand(), EventMeshUtil.printMqMessage + (eventMeshMessage), session.getClient(), taskExecuteTime - startTime, sendTime - startTime); + } else { + throw new Exception(sendStatus.getDetail()); + } + } catch (Exception e) { + logger.error("TCP UpstreamMsg Retry error", e); + } + } + + protected SendCallback createSendCallback(Command replyCmd, long taskExecuteTime, EventMeshMessage eventMeshMessage) { + final long createTime = System.currentTimeMillis(); + Package msg = new Package(); + + return new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + session.getSender().getUpstreamBuff().release(); + logger.info("upstreamMsg message success|user={}|callback cost={}", session.getClient(), + String.valueOf(System.currentTimeMillis() - createTime)); + if (replyCmd.equals(Command.BROADCAST_MESSAGE_TO_SERVER_ACK) || replyCmd.equals(Command + .ASYNC_MESSAGE_TO_SERVER_ACK)) { + msg.setHeader(new Header(replyCmd, OPStatus.SUCCESS.getCode(), OPStatus.SUCCESS.getDesc(), seq)); + msg.setBody(eventMeshMessage); + Utils.writeAndFlush(msg, startTime, taskExecuteTime, session.getContext(), session); + } + } + + @Override + public void onException(OnExceptionContext context) { + session.getSender().getUpstreamBuff().release(); + + // retry + UpStreamMsgContext upStreamMsgContext = new UpStreamMsgContext( + session, EventMeshUtil.decodeMessage(eventMeshMessage), header, startTime, taskExecuteTime); + upStreamMsgContext.delay(10000); + session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(upStreamMsgContext); + + session.getSender().failMsgCount.incrementAndGet(); + logger.error("upstreamMsg mq message error|user={}|callback cost={}, errMsg={}", session.getClient(), String.valueOf + (System.currentTimeMillis() - createTime), new Exception(context.getException())); + msg.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode(), context.getException().toString(), seq)); + msg.setBody(eventMeshMessage); + Utils.writeAndFlush(msg, startTime, taskExecuteTime, session.getContext(), session); + } + + }; + } + + private Command getReplyCmd(Command cmd) { + switch (cmd) { + case REQUEST_TO_SERVER: + return Command.RESPONSE_TO_CLIENT; + case ASYNC_MESSAGE_TO_SERVER: + return Command.ASYNC_MESSAGE_TO_SERVER_ACK; + case BROADCAST_MESSAGE_TO_SERVER: + return Command.BROADCAST_MESSAGE_TO_SERVER_ACK; + default: + return cmd; + } } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java index 87eeed0993..b0c79fa4a9 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java @@ -17,34 +17,31 @@ package org.apache.eventmesh.runtime.core.protocol.tcp.client.task; -import static org.apache.eventmesh.common.protocol.tcp.Command.RESPONSE_TO_SERVER; - -import java.util.concurrent.TimeUnit; - import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.openmessaging.api.OnExceptionContext; import io.openmessaging.api.SendCallback; import io.openmessaging.api.SendResult; - import org.apache.commons.lang3.StringUtils; -import org.apache.eventmesh.common.protocol.tcp.Command; -import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; -import org.apache.eventmesh.common.protocol.tcp.Header; -import org.apache.eventmesh.common.protocol.tcp.OPStatus; import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.common.protocol.tcp.*; import org.apache.eventmesh.runtime.acl.Acl; import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendResult; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendStatus; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.UpStreamMsgContext; import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.runtime.util.RemotingHelper; import org.apache.eventmesh.runtime.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + +import static org.apache.eventmesh.common.protocol.tcp.Command.RESPONSE_TO_SERVER; + public class MessageTransferTask extends AbstractTask { private final Logger messageLogger = LoggerFactory.getLogger("message"); @@ -158,6 +155,13 @@ public void onSuccess(SendResult sendResult) { @Override public void onException(OnExceptionContext context) { session.getSender().getUpstreamBuff().release(); + + // retry + UpStreamMsgContext upStreamMsgContext = new UpStreamMsgContext( + session, EventMeshUtil.decodeMessage(eventMeshMessage), pkg.getHeader(), startTime, taskExecuteTime); + upStreamMsgContext.delay(10000); + session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(upStreamMsgContext); + session.getSender().failMsgCount.incrementAndGet(); messageLogger.error("upstreamMsg mq message error|user={}|callback cost={}, errMsg={}", session.getClient(), String.valueOf (System.currentTimeMillis() - createTime), new Exception(context.getException()));