Skip to content

Commit

Permalink
[ISSUE #350]optimize flow control in downstreaming msg (#352)
Browse files Browse the repository at this point in the history
* modify:optimize flow control in downstreaming msg

* modify:optimize stategy of selecting session in downstream msg

* modify:optimize msg downstream,msg store in session

* modify:fix bug:not a @sharable handler
  • Loading branch information
lrhkobe authored May 31, 2021
1 parent 764abe3 commit 1ca80fb
Show file tree
Hide file tree
Showing 13 changed files with 106 additions and 252 deletions.
2 changes: 1 addition & 1 deletion eventmesh-runtime/conf/eventmesh.properties
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ eventMesh.server.session.expiredInMills=60000
# flow control, include the global level and session level
eventMesh.server.tcp.msgReqnumPerSecond=15000
eventMesh.server.session.upstreamBufferSize=20
eventMesh.server.session.downstreamUnackSize=100

# thread number about global scheduler
eventMesh.server.global.scheduler=5
eventMesh.server.tcp.taskHandleExecutorPoolSize=8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.*;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.ChannelTrafficShapingHandler;
Expand Down Expand Up @@ -108,10 +104,6 @@ public void setRateLimiter(RateLimiter rateLimiter) {

private RateLimiter rateLimiter;

private EventMeshTcpMessageDispatcher eventMeshTcpMessageDispatcher = new EventMeshTcpMessageDispatcher(EventMeshTCPServer.this);
private EventMeshTcpExceptionHandler eventMeshTcpExceptionHandler = new EventMeshTcpExceptionHandler(EventMeshTCPServer.this);


public EventMeshTCPServer(EventMeshServer eventMeshServer,
EventMeshTCPConfiguration eventMeshTCPConfiguration) {
super();
Expand Down Expand Up @@ -147,8 +139,8 @@ public void initChannel(Channel ch) throws Exception {
.addLast(workerGroup, new IdleStateHandler(eventMeshTCPConfiguration.eventMeshTcpIdleReadSeconds,
eventMeshTCPConfiguration.eventMeshTcpIdleWriteSeconds,
eventMeshTCPConfiguration.eventMeshTcpIdleAllSeconds),
eventMeshTcpMessageDispatcher,
eventMeshTcpExceptionHandler
new EventMeshTcpMessageDispatcher(EventMeshTCPServer.this),
new EventMeshTcpExceptionHandler(EventMeshTCPServer.this)
);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ public class EventMeshTCPConfiguration extends CommonConfiguration {

public int eventMeshTcpSessionUpstreamBufferSize = 100;

public int eventMeshTcpSessionDownstreamUnackSize = 12;

public int eventMeshTcpMsgRetryTimes = 3;

public int eventMeshTcpMsgRetryDelayInMills = 500;
Expand All @@ -71,8 +69,6 @@ public class EventMeshTCPConfiguration extends CommonConfiguration {

public int eventMeshTcpPushFailIsolateTimeInMills = 30 * 1000;

public int eventMeshTcpDownStreamMapSize = 500;

private TrafficShapingConfig gtc = new TrafficShapingConfig(0, 10_000, 1_000, 2000);
private TrafficShapingConfig ctc = new TrafficShapingConfig(0, 2_000, 1_000, 10_000);

Expand Down Expand Up @@ -145,12 +141,6 @@ public void init() {
eventMeshTcpSessionUpstreamBufferSize = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpSessionUpstreamBufferSizeStr));
}

String eventMeshTcpSessionDownstreamUnackSizeStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_SESSION_DOWNSTREAM_UNACK_SIZE);
if (StringUtils.isNotEmpty(eventMeshTcpSessionDownstreamUnackSizeStr)) {
Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpSessionDownstreamUnackSizeStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_SESSION_DOWNSTREAM_UNACK_SIZE));
eventMeshTcpSessionDownstreamUnackSize = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpSessionDownstreamUnackSizeStr));
}

//========================================eventMesh retry config=============================================//
String eventMeshTcpMsgRetryTimesStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_RETRY_PUSH_RETRY_TIMES);
if (StringUtils.isNotEmpty(eventMeshTcpMsgRetryTimesStr)) {
Expand Down Expand Up @@ -194,12 +184,6 @@ public void init() {
Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpPushFailIsolateTimeInMillsStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_SERVER_PUSH_FAIL_ISOLATE_TIME));
eventMeshTcpPushFailIsolateTimeInMills = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpPushFailIsolateTimeInMillsStr));
}

String eventMeshTcpDownStreamMapSizeStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_TCP_DOWNSTREAM_MAP_SIZE);
if (StringUtils.isNotEmpty(eventMeshTcpDownStreamMapSizeStr)) {
Preconditions.checkState(StringUtils.isNumeric(eventMeshTcpDownStreamMapSizeStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_TCP_DOWNSTREAM_MAP_SIZE));
eventMeshTcpDownStreamMapSize = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshTcpDownStreamMapSizeStr));
}
}

public TrafficShapingConfig getGtc() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,6 @@ public class ClientGroupWrapper {

private ConcurrentHashMap<String, Set<Session>> topic2sessionInGroupMapping = new ConcurrentHashMap<String, Set<Session>>();

private ConcurrentHashMap<String, DownStreamMsgContext> downstreamMap = new ConcurrentHashMap<String, DownStreamMsgContext>();

public ConcurrentHashMap<String, DownStreamMsgContext> getDownstreamMap() {
return downstreamMap;
}

public AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE);

public ClientGroupWrapper(String sysId, String dcn,
Expand Down Expand Up @@ -538,6 +532,9 @@ public void consume(Message message, AsyncConsumeContext context) {

Iterator<Session> sessionsItr = groupConsumerSessions.iterator();

DownStreamMsgContext downStreamMsgContext =
new DownStreamMsgContext(message, null, broadCastMsgConsumer, broadCastMsgConsumer.getContext(), false);

while (sessionsItr.hasNext()) {
Session session = sessionsItr.next();

Expand All @@ -546,18 +543,10 @@ public void consume(Message message, AsyncConsumeContext context) {
continue;
}

DownStreamMsgContext downStreamMsgContext =
new DownStreamMsgContext(message, session, broadCastMsgConsumer, broadCastMsgConsumer.getContext(), false);

if (session.isCanDownStream()) {
session.downstreamMsg(downStreamMsgContext);
continue;
}

logger.warn("downstream broadcast msg,session is busy,dispatch retry,seq:{}, session:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.session.getClient(), EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
long delayTime = EventMeshUtil.isService(downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION)) ? 0 : eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryDelayInMills;
downStreamMsgContext.delay(delayTime);
eventMeshTcpRetryer.pushRetry(downStreamMsgContext);
downStreamMsgContext.session = session;
//msg put in eventmesh,waiting client ack
session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
session.downstreamMsg(downStreamMsgContext);
}

// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
Expand Down Expand Up @@ -588,14 +577,14 @@ public void consume(Message message, AsyncConsumeContext context) {
sendBackFromEventMeshIp = message.getSystemProperties(EventMeshConstants.EVENTMESH_SEND_BACK_IP);
}

logger.error("found no session to downstream msg,groupName:{}, topic:{}, bizSeqNo:{}", groupName, topic, bizSeqNo);
logger.error("found no session to downstream msg,groupName:{}, topic:{}, bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}", groupName, topic, bizSeqNo, sendBackTimes, sendBackFromEventMeshIp);

if (sendBackTimes >= eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) {
logger.error("sendBack to broker over max times:{}, groupName:{}, topic:{}, bizSeqNo:{}", eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes, groupName, topic, bizSeqNo);
} else {
sendBackTimes++;
message.getSystemProperties().put(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES, sendBackTimes.toString());
message.getSystemProperties().put(EventMeshConstants.EVENTMESH_SEND_BACK_IP, sendBackFromEventMeshIp);
message.getSystemProperties().put(EventMeshConstants.EVENTMESH_SEND_BACK_IP, eventMeshTCPConfiguration.eventMeshServerIp);
sendMsgBackToBroker(message, bizSeqNo);
}
} catch (Exception e) {
Expand All @@ -610,26 +599,9 @@ public void consume(Message message, AsyncConsumeContext context) {

DownStreamMsgContext downStreamMsgContext =
new DownStreamMsgContext(message, session, persistentMsgConsumer, persistentMsgConsumer.getContext(), false);

if (downstreamMap.size() < eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpDownStreamMapSize) {
downstreamMap.putIfAbsent(downStreamMsgContext.seq, downStreamMsgContext);
} else {
logger.warn("downStreamMap is full,group:{}", groupName);
}

if (session.isCanDownStream()) {
session.downstreamMsg(downStreamMsgContext);
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
// context.ack();
context.commit(Action.CommitMessage);
return;
}

logger.warn("session is busy,dispatch retry,seq:{}, session:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.session.getClient(), bizSeqNo);
long delayTime = EventMeshUtil.isService(downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION)) ? 0 : eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryDelayInMills;
downStreamMsgContext.delay(delayTime);
eventMeshTcpRetryer.pushRetry(downStreamMsgContext);

//msg put in eventmesh,waiting client ack
session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
session.downstreamMsg(downStreamMsgContext);
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
// context.ack();
context.commit(Action.CommitMessage);
Expand Down Expand Up @@ -669,28 +641,11 @@ public synchronized void shutdownPersistentConsumer() throws Exception {
}

public Set<Session> getGroupConsumerSessions() {
Set<Session> res = null;
try {
this.groupLock.readLock().lockInterruptibly();
res = groupConsumerSessions;
} catch (Exception e) {
} finally {
this.groupLock.readLock().unlock();
}
return res;
return groupConsumerSessions;
}


public Set<Session> getGroupProducerSessions() {
Set<Session> res = null;
try {
this.groupLock.readLock().lockInterruptibly();
res = groupProducerSessions;
} catch (Exception e) {
} finally {
this.groupLock.readLock().unlock();
}
return res;
return groupProducerSessions;
}

public void setGroupName(String groupName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@

import java.lang.ref.WeakReference;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
Expand All @@ -45,7 +42,6 @@
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.dispatch.FreePriorityDispatchStrategy;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.SessionState;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.ClientAckContext;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
Expand Down Expand Up @@ -298,21 +294,24 @@ private void cleanSubscriptionInSession(Session session) throws Exception {
* @param session
*/
private void handleUnackMsgsInSession(Session session) {
ConcurrentHashMap<String /** seq */, ClientAckContext> unAckMsg = session.getPusher().getPushContext().getUnAckMsg();
ConcurrentHashMap<String /** seq */, DownStreamMsgContext> unAckMsg = session.getPusher().getUnAckMsg();
if (unAckMsg.size() > 0 && session.getClientGroupWrapper().get().getGroupConsumerSessions().size() > 0) {
for (Map.Entry<String, ClientAckContext> entry : unAckMsg.entrySet()) {
ClientAckContext ackContext = entry.getValue();
if (EventMeshUtil.isBroadcast(ackContext.getMsgs().get(0).getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION))) {
logger.warn("exist broadcast msg unack when closeSession,seq:{},bizSeq:{},client:{}", ackContext.getSeq(), EventMeshUtil.getMessageBizSeq(ackContext.getMsgs().get(0)), session.getClient());
for (Map.Entry<String, DownStreamMsgContext> entry : unAckMsg.entrySet()) {
DownStreamMsgContext downStreamMsgContext = entry.getValue();
if (EventMeshUtil.isBroadcast(downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION))) {
logger.warn("exist broadcast msg unack when closeSession,seq:{},bizSeq:{},client:{}", downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt), session.getClient());
continue;
}
List<Session> list = new ArrayList<>(session.getClientGroupWrapper().get().getGroupConsumerSessions());
Collections.shuffle(list);
DownStreamMsgContext downStreamMsgContext = new DownStreamMsgContext(ackContext.getMsgs().get(0), list.get(0), ackContext.getConsumer(), ackContext.getContext(), false);

downStreamMsgContext.delay(0L);
eventMeshTCPServer.getEventMeshTcpRetryer().pushRetry(downStreamMsgContext);
logger.warn("rePush msg form unAckMsgs,seq:{},rePushSeq:{},rePushClient:{}", entry.getKey(), downStreamMsgContext.seq, downStreamMsgContext.session.getClient());
Session reChooseSession = session.getClientGroupWrapper().get().getDownstreamDispatchStrategy().select(session.getClientGroupWrapper().get().getGroupName()
, downStreamMsgContext.msgExt.getTopic()
, session.getClientGroupWrapper().get().groupConsumerSessions);
if(reChooseSession != null){
downStreamMsgContext.session = reChooseSession;
reChooseSession.downstreamMsg(downStreamMsgContext);
logger.info("rePush msg form unAckMsgs,seq:{},rePushClient:{}", entry.getKey(), downStreamMsgContext.session.getClient());
}else{
logger.warn("select session fail in handleUnackMsgsInSession,seq:{},topic:{}", entry.getKey(), downStreamMsgContext.msgExt.getTopic());
}
}
}
}
Expand Down Expand Up @@ -371,46 +370,25 @@ public void run() {
}, 1000, eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills, TimeUnit.MILLISECONDS);
}

private void initSessionAckContextCleaner() {
private void initDownStreamMsgContextCleaner() {
eventMeshTCPServer.getScheduler().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {

//scan non-broadcast msg
Iterator<Session> sessionIterator = sessionTable.values().iterator();
while (sessionIterator.hasNext()) {
Session tmp = sessionIterator.next();
for (Map.Entry<String, ClientAckContext> entry : tmp.getPusher().getPushContext().getUnAckMsg().entrySet()) {
for (Map.Entry<String, DownStreamMsgContext> entry : tmp.getPusher().getUnAckMsg().entrySet()) {
String seqKey = entry.getKey();
ClientAckContext clientAckContext = entry.getValue();
if (!clientAckContext.isExpire()) {
continue;
}
tmp.getPusher().getPushContext().ackMsg(seqKey);
tmp.getPusher().getPushContext().getUnAckMsg().remove(seqKey);
logger.warn("remove expire clientAckContext, session:{}, topic:{}, seq:{}", tmp,
clientAckContext.getMsgs().get(0).getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION), seqKey);
}
}
}
}, 1000, 5 * 1000, TimeUnit.MILLISECONDS);
}

private void initDownStreamMsgContextCleaner() {
eventMeshTCPServer.getScheduler().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Iterator<ClientGroupWrapper> cgwIterator = clientGroupMap.values().iterator();
while (cgwIterator.hasNext()) {
ClientGroupWrapper cgw = cgwIterator.next();
for (Map.Entry<String, DownStreamMsgContext> entry : cgw.getDownstreamMap().entrySet()) {
String seq = entry.getKey();
DownStreamMsgContext downStreamMsgContext = entry.getValue();
if (!downStreamMsgContext.isExpire()) {
continue;
}
cgw.getDownstreamMap().get(seq).ackMsg();
cgw.getDownstreamMap().remove(seq);
logger.warn("remove expire DownStreamMsgContext,group:{}, topic:{}, seq:{}", cgw.getGroupName(),
downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION), seq);
downStreamMsgContext.ackMsg();
tmp.getPusher().getUnAckMsg().remove(seqKey);
logger.warn("remove expire downStreamMsgContext, session:{}, topic:{}, seq:{}", tmp,
downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION), seqKey);
}
}
}
Expand All @@ -420,7 +398,6 @@ public void run() {

public void init() throws Exception {
initSessionCleaner();
initSessionAckContextCleaner();
initDownStreamMsgContextCleaner();
logger.info("ClientSessionGroupMapping inited......");
}
Expand Down
Loading

0 comments on commit 1ca80fb

Please sign in to comment.