diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java index 900ca2d683..5b86ba3879 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java @@ -127,19 +127,6 @@ public void retry() { if (rechoosen == null) { logger.warn("retry, found no session to downstream msg,seq:{}, retryTimes:{}, bizSeq:{}", this.seq, this.retryTimes, EventMeshUtil.getMessageBizSeq(this.msgExt)); - -// //需要手动ack掉没有下发成功的消息 -// eventMeshAckMsg(downStreamMsgContext); - -// //重试找不到下发session不再回发broker或者重试其它eventMesh -// String bizSeqNo = finalDownStreamMsgContext.msgExt.getKeys(); -// String uniqueId = MapUtils.getString(finalDownStreamMsgContext.msgExt.getProperties(), WeMQConstant.RMB_UNIQ_ID, ""); -// if(EventMeshTCPServer.getAccessConfiguration().eventMeshTcpSendBackEnabled){ -// sendMsgBackToBroker(finalDownStreamMsgContext.msgExt, bizSeqNo, uniqueId); -// }else{ -// //TODO 将消息推给其它eventMesh,待定 -// sendMsgToOtherEventMesh(finalDownStreamMsgContext.msgExt, bizSeqNo, uniqueId); -// } } else { this.session = rechoosen; rechoosen.downstreamMsg(this); @@ -182,8 +169,6 @@ private void eventMeshAckMsg(DownStreamMsgContext downStreamMsgContext) { logger.warn("eventMeshAckMsg topic:{}, seq:{}, bizSeq:{}", downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION), downStreamMsgContext.seq, downStreamMsgContext.msgExt.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_KEYS)); downStreamMsgContext.consumer.updateOffset(msgExts, downStreamMsgContext.consumeConcurrentlyContext); -// ConsumeMessageService consumeMessageService = downStreamMsgContext.consumer.getDefaultMQPushConsumer().getDefaultMQPushConsumerImpl().getConsumeMessageService(); -// ((ConsumeMessageConcurrentlyService)consumeMessageService).updateOffset(msgExts, downStreamMsgContext.consumeConcurrentlyContext); } }