From 546765e5a9fd7bce1de99d6b50705220dc32293d Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Sun, 26 Sep 2021 22:07:40 -0400 Subject: [PATCH] [Issue #523] Adding Flow control RateLimiter support in Http Message Send Processor (#524) * [Issue #337] Fix HttpSubscriber startup issue * [Issue #337] test commit * [Issue #337] revert test commit * [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook * [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook * [Issue #337] Address code review comment for Subscriber Demo App * Issue #523 adding FlowControl Ratelimiter support for Http message send processors * [Issue #523] Fixing the Eventmesh ratelimit error return code. Co-authored-by: j00441484 --- .../http/common/EventMeshRetCode.java | 3 +- eventmesh-runtime/conf/eventmesh.properties | 1 + .../runtime/boot/EventMeshHTTPServer.java | 39 +++++++++++-------- .../EventMeshHTTPConfiguration.java | 22 +++++++---- .../processor/BatchSendMessageProcessor.java | 15 ++++--- .../BatchSendMessageV2Processor.java | 7 ++-- .../http/processor/ReplyMessageProcessor.java | 19 +++++++-- .../processor/SendAsyncMessageProcessor.java | 15 ++++++- .../processor/SendSyncMessageProcessor.java | 19 ++++++--- 9 files changed, 93 insertions(+), 47 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java index 7795507cc8..116b0d1670 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java @@ -39,7 +39,8 @@ public enum EventMeshRetCode { EVENTMESH_SUBSCRIBE_ERR(17, "eventMesh subscribe err"), EVENTMESH_UNSUBSCRIBE_ERR(18, "eventMesh unsubscribe err"), EVENTMESH_HEARTBEAT_ERR(19, "eventMesh heartbeat err"), - EVENTMESH_ACL_ERR(20, "eventMesh acl err"); + EVENTMESH_ACL_ERR(20, "eventMesh acl err"), + EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR(21, "eventMesh http msg send over the limit, "); private Integer retCode; diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index 2f5bbfa069..b2f18b7f88 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -36,6 +36,7 @@ eventMesh.server.tcp.RebalanceIntervalInMills=30000 eventMesh.server.session.expiredInMills=60000 # flow control, include the global level and session level eventMesh.server.tcp.msgReqnumPerSecond=15000 +eventMesh.server.http.msgReqnumPerSecond=15000 eventMesh.server.session.upstreamBufferSize=20 # thread number about global scheduler diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java index ba1c170485..f5055ba4af 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java @@ -17,35 +17,27 @@ package org.apache.eventmesh.runtime.boot; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; - import com.google.common.eventbus.EventBus; - +import com.google.common.util.concurrent.RateLimiter; import org.apache.eventmesh.common.ThreadPoolFactory; import org.apache.eventmesh.common.protocol.http.common.RequestCode; import org.apache.eventmesh.runtime.common.ServiceState; import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf; import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager; -import org.apache.eventmesh.runtime.core.protocol.http.processor.AdminMetricsProcessor; -import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageProcessor; -import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageV2Processor; -import org.apache.eventmesh.runtime.core.protocol.http.processor.HeartBeatProcessor; -import org.apache.eventmesh.runtime.core.protocol.http.processor.ReplyMessageProcessor; -import org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncMessageProcessor; -import org.apache.eventmesh.runtime.core.protocol.http.processor.SendSyncMessageProcessor; -import org.apache.eventmesh.runtime.core.protocol.http.processor.SubscribeProcessor; -import org.apache.eventmesh.runtime.core.protocol.http.processor.UnSubscribeProcessor; +import org.apache.eventmesh.runtime.core.protocol.http.processor.*; import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client; import org.apache.eventmesh.runtime.core.protocol.http.producer.ProducerManager; import org.apache.eventmesh.runtime.core.protocol.http.push.AbstractHTTPPushRequest; import org.apache.eventmesh.runtime.core.protocol.http.retry.HttpRetryer; import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; + public class EventMeshHTTPServer extends AbstractHTTPServer { private EventMeshServer eventMeshServer; @@ -89,6 +81,10 @@ public EventMeshServer getEventMeshServer() { public ThreadPoolExecutor adminExecutor; + private RateLimiter msgRateLimiter; + + private RateLimiter batchRateLimiter; + public void shutdownThreadPool() throws Exception { batchMsgExecutor.shutdown(); adminExecutor.shutdown(); @@ -149,12 +145,23 @@ public ThreadPoolExecutor getAdminExecutor() { return adminExecutor; } + public RateLimiter getMsgRateLimiter() { + return msgRateLimiter; + } + + public RateLimiter getBatchRateLimiter() { + return batchRateLimiter; + } + public void init() throws Exception { logger.info("==================EventMeshHTTPServer Initialing=================="); super.init("eventMesh-http"); initThreadPool(); + msgRateLimiter = RateLimiter.create(eventMeshHttpConfiguration.eventMeshHttpMsgReqNumPerSecond); + batchRateLimiter = RateLimiter.create(eventMeshHttpConfiguration.eventMeshBatchMsgRequestNumPerSecond); + metrics = new HTTPMetricsServer(this); metrics.init(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java index ffa6cbd9b2..05511b0368 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java @@ -18,8 +18,6 @@ package org.apache.eventmesh.runtime.configuration; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.RateLimiter; - import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.config.CommonConfiguration; import org.apache.eventmesh.common.config.ConfigurationWrapper; @@ -28,8 +26,6 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration { public int httpServerPort = 10105; - public RateLimiter eventMeshServerBatchMsgNumLimiter = RateLimiter.create(20000); - public boolean eventMeshServerBatchMsgBatchEnabled = Boolean.TRUE; public int eventMeshServerBatchMsgThreadNum = 10; @@ -68,6 +64,10 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration { public boolean eventMeshServerUseTls = false; + public int eventMeshHttpMsgReqNumPerSecond = 15000; + + public int eventMeshBatchMsgRequestNumPerSecond = 20000; + public EventMeshHTTPConfiguration(ConfigurationWrapper configurationWrapper) { super(configurationWrapper); } @@ -86,9 +86,9 @@ public void init() { eventMeshServerBatchMsgThreadNum = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshServerBatchMsgThreadNumStr)); } - String eventMeshServerBatchMsgNumLimiterStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_BATCHMSG_RATELIMITER); - if (StringUtils.isNotEmpty(eventMeshServerBatchMsgNumLimiterStr) && StringUtils.isNumeric(eventMeshServerBatchMsgNumLimiterStr)) { - eventMeshServerBatchMsgNumLimiter = RateLimiter.create(Double.valueOf(StringUtils.deleteWhitespace(eventMeshServerBatchMsgNumLimiterStr))); + String eventMeshServerBatchMsgReqNumPerSecondStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_BATCHMSG_REQ_NUM_PER_SECOND); + if (StringUtils.isNotEmpty(eventMeshServerBatchMsgReqNumPerSecondStr) && StringUtils.isNumeric(eventMeshServerBatchMsgReqNumPerSecondStr)) { + eventMeshBatchMsgRequestNumPerSecond = Integer.valueOf(eventMeshServerBatchMsgReqNumPerSecondStr); } String eventMeshServerBatchMsgBatchEnableStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_BATCHMSG_BATCH_ENABLED); @@ -180,6 +180,11 @@ public void init() { if (StringUtils.isNotEmpty(eventMeshServerUseTlsStr)) { eventMeshServerUseTls = Boolean.valueOf(StringUtils.deleteWhitespace(eventMeshServerUseTlsStr)); } + + String eventMeshHttpMsgReqNumPerSecondStr = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_SERVER_MSG_REQ_NUM_PER_SECOND); + if (StringUtils.isNotEmpty(eventMeshHttpMsgReqNumPerSecondStr) && StringUtils.isNumeric(eventMeshHttpMsgReqNumPerSecondStr)) { + eventMeshHttpMsgReqNumPerSecond = Integer.valueOf(eventMeshHttpMsgReqNumPerSecondStr); + } } } @@ -189,7 +194,7 @@ static class ConfKeys { public static String KEYS_EVENTMESH_BATCHMSG_THREAD_NUM = "eventMesh.server.batchmsg.threads.num"; - public static String KEYS_EVENTMESH_BATCHMSG_RATELIMITER = "eventMesh.server.batchmsg.speed.ratelimiter"; + public static String KEYS_EVENTMESH_BATCHMSG_REQ_NUM_PER_SECOND = "eventMesh.server.batchmsg.reqNumPerSecond"; public static String KEYS_EVENTMESH_BATCHMSG_BATCH_ENABLED = "eventMesh.server.batchmsg.batch.enabled"; @@ -227,5 +232,6 @@ static class ConfKeys { public static String KEY_EVENTMESH_HTTPS_ENABLED = "eventMesh.server.useTls.enabled"; + public static String KEY_EVENTMESH_SERVER_MSG_REQ_NUM_PER_SECOND = "eventMesh.server.http.msgReqnumPerSecond"; } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java index cc7ea48af9..a8b7197816 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java @@ -17,18 +17,11 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; - import io.netty.channel.ChannelHandlerContext; import io.openmessaging.api.Message; import io.openmessaging.api.OnExceptionContext; import io.openmessaging.api.SendCallback; import io.openmessaging.api.SendResult; - import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.Constants; @@ -52,6 +45,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + public class BatchSendMessageProcessor implements HttpRequestProcessor { public Logger cmdLogger = LoggerFactory.getLogger("cmd"); @@ -104,7 +103,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext return; } - if (!eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerBatchMsgNumLimiter + if (!eventMeshHTTPServer.getBatchRateLimiter() .tryAcquire(Integer.valueOf(sendMessageBatchRequestBody.getSize()), EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) { responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( sendMessageBatchResponseHeader, diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java index 07a2dbeecd..10950ec7ec 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java @@ -17,14 +17,11 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.TimeUnit; - import io.netty.channel.ChannelHandlerContext; import io.openmessaging.api.Message; import io.openmessaging.api.OnExceptionContext; import io.openmessaging.api.SendCallback; import io.openmessaging.api.SendResult; - import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.IPUtil; @@ -48,6 +45,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + public class BatchSendMessageV2Processor implements HttpRequestProcessor { public Logger cmdLogger = LoggerFactory.getLogger("cmd"); @@ -122,7 +121,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext } } - if (!eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerBatchMsgNumLimiter + if (!eventMeshHTTPServer.getBatchRateLimiter() .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) { responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( sendMessageBatchV2ResponseHeader, diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java index 6b86b2ecd4..fb780dba97 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java @@ -17,19 +17,15 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.Map; - import io.netty.channel.ChannelHandlerContext; import io.openmessaging.api.Message; import io.openmessaging.api.OnExceptionContext; import io.openmessaging.api.SendCallback; import io.openmessaging.api.SendResult; - import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.IPUtil; -import org.apache.eventmesh.common.LiteMessage; import org.apache.eventmesh.common.command.HttpCommand; import org.apache.eventmesh.common.protocol.http.body.message.ReplyMessageRequestBody; import org.apache.eventmesh.common.protocol.http.body.message.ReplyMessageResponseBody; @@ -50,6 +46,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; +import java.util.concurrent.TimeUnit; + public class ReplyMessageProcessor implements HttpRequestProcessor { public Logger messageLogger = LoggerFactory.getLogger("message"); @@ -104,6 +103,18 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext return; } + // control flow rate limit + if (!eventMeshHTTPServer.getMsgRateLimiter() + .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) { + responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( + replyMessageResponseHeader, + ReplyMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(), + EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg())); + eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPDiscard(); + asyncContext.onComplete(responseEventMeshCommand); + return; + } + String producerGroup = replyMessageRequestBody.getProducerGroup(); EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java index b79b587a23..d64ef6bcec 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java @@ -22,7 +22,6 @@ import io.openmessaging.api.OnExceptionContext; import io.openmessaging.api.SendCallback; import io.openmessaging.api.SendResult; - import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.IPUtil; @@ -47,6 +46,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + public class SendAsyncMessageProcessor implements HttpRequestProcessor { public Logger messageLogger = LoggerFactory.getLogger("message"); @@ -129,6 +130,18 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext } } + // control flow rate limit + if (!eventMeshHTTPServer.getMsgRateLimiter() + .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) { + responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( + sendMessageResponseHeader, + SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(), + EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg())); + eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPDiscard(); + asyncContext.onComplete(responseEventMeshCommand); + return; + } + String producerGroup = sendMessageRequestBody.getProducerGroup(); EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java index edb1a5837b..5aa623d125 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java @@ -18,13 +18,8 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; import com.alibaba.fastjson.JSON; - import io.netty.channel.ChannelHandlerContext; import io.openmessaging.api.Message; -import io.openmessaging.api.OnExceptionContext; -import io.openmessaging.api.SendCallback; -import io.openmessaging.api.SendResult; - import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.common.Constants; @@ -51,6 +46,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + public class SendSyncMessageProcessor implements HttpRequestProcessor { public Logger messageLogger = LoggerFactory.getLogger("message"); @@ -130,6 +127,18 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext } } + // control flow rate limit + if (!eventMeshHTTPServer.getMsgRateLimiter() + .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) { + responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( + sendMessageResponseHeader, + SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(), + EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg())); + eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPDiscard(); + asyncContext.onComplete(responseEventMeshCommand); + return; + } + String producerGroup = sendMessageRequestBody.getProducerGroup(); EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);