Skip to content

Commit

Permalink
[Issue #523] Adding Flow control RateLimiter support in Http Message …
Browse files Browse the repository at this point in the history
…Send Processor (#524)

* [Issue #337] Fix HttpSubscriber startup issue

* [Issue #337] test commit

* [Issue #337] revert test commit

* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook

* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook

* [Issue #337] Address code review comment for Subscriber Demo App

* Issue #523 adding FlowControl Ratelimiter support for Http message send processors

* [Issue #523] Fixing the Eventmesh ratelimit error return code.

Co-authored-by: j00441484 <[email protected]>
  • Loading branch information
jinrongluo and j00441484 authored Sep 27, 2021
1 parent 9c1d21f commit 546765e
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public enum EventMeshRetCode {
EVENTMESH_SUBSCRIBE_ERR(17, "eventMesh subscribe err"),
EVENTMESH_UNSUBSCRIBE_ERR(18, "eventMesh unsubscribe err"),
EVENTMESH_HEARTBEAT_ERR(19, "eventMesh heartbeat err"),
EVENTMESH_ACL_ERR(20, "eventMesh acl err");
EVENTMESH_ACL_ERR(20, "eventMesh acl err"),
EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR(21, "eventMesh http msg send over the limit, ");

private Integer retCode;

Expand Down
1 change: 1 addition & 0 deletions eventmesh-runtime/conf/eventmesh.properties
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ eventMesh.server.tcp.RebalanceIntervalInMills=30000
eventMesh.server.session.expiredInMills=60000
# flow control, include the global level and session level
eventMesh.server.tcp.msgReqnumPerSecond=15000
eventMesh.server.http.msgReqnumPerSecond=15000
eventMesh.server.session.upstreamBufferSize=20

# thread number about global scheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,27 @@

package org.apache.eventmesh.runtime.boot;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

import com.google.common.eventbus.EventBus;

import com.google.common.util.concurrent.RateLimiter;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.runtime.common.ServiceState;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager;
import org.apache.eventmesh.runtime.core.protocol.http.processor.AdminMetricsProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageV2Processor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.HeartBeatProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.ReplyMessageProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncMessageProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.SendSyncMessageProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.SubscribeProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.UnSubscribeProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.*;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
import org.apache.eventmesh.runtime.core.protocol.http.producer.ProducerManager;
import org.apache.eventmesh.runtime.core.protocol.http.push.AbstractHTTPPushRequest;
import org.apache.eventmesh.runtime.core.protocol.http.retry.HttpRetryer;
import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

public class EventMeshHTTPServer extends AbstractHTTPServer {

private EventMeshServer eventMeshServer;
Expand Down Expand Up @@ -89,6 +81,10 @@ public EventMeshServer getEventMeshServer() {

public ThreadPoolExecutor adminExecutor;

private RateLimiter msgRateLimiter;

private RateLimiter batchRateLimiter;

public void shutdownThreadPool() throws Exception {
batchMsgExecutor.shutdown();
adminExecutor.shutdown();
Expand Down Expand Up @@ -149,12 +145,23 @@ public ThreadPoolExecutor getAdminExecutor() {
return adminExecutor;
}

public RateLimiter getMsgRateLimiter() {
return msgRateLimiter;
}

public RateLimiter getBatchRateLimiter() {
return batchRateLimiter;
}

public void init() throws Exception {
logger.info("==================EventMeshHTTPServer Initialing==================");
super.init("eventMesh-http");

initThreadPool();

msgRateLimiter = RateLimiter.create(eventMeshHttpConfiguration.eventMeshHttpMsgReqNumPerSecond);
batchRateLimiter = RateLimiter.create(eventMeshHttpConfiguration.eventMeshBatchMsgRequestNumPerSecond);

metrics = new HTTPMetricsServer(this);
metrics.init();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.eventmesh.runtime.configuration;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;

import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.config.ConfigurationWrapper;
Expand All @@ -28,8 +26,6 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration {

public int httpServerPort = 10105;

public RateLimiter eventMeshServerBatchMsgNumLimiter = RateLimiter.create(20000);

public boolean eventMeshServerBatchMsgBatchEnabled = Boolean.TRUE;

public int eventMeshServerBatchMsgThreadNum = 10;
Expand Down Expand Up @@ -68,6 +64,10 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration {

public boolean eventMeshServerUseTls = false;

public int eventMeshHttpMsgReqNumPerSecond = 15000;

public int eventMeshBatchMsgRequestNumPerSecond = 20000;

public EventMeshHTTPConfiguration(ConfigurationWrapper configurationWrapper) {
super(configurationWrapper);
}
Expand All @@ -86,9 +86,9 @@ public void init() {
eventMeshServerBatchMsgThreadNum = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshServerBatchMsgThreadNumStr));
}

String eventMeshServerBatchMsgNumLimiterStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_BATCHMSG_RATELIMITER);
if (StringUtils.isNotEmpty(eventMeshServerBatchMsgNumLimiterStr) && StringUtils.isNumeric(eventMeshServerBatchMsgNumLimiterStr)) {
eventMeshServerBatchMsgNumLimiter = RateLimiter.create(Double.valueOf(StringUtils.deleteWhitespace(eventMeshServerBatchMsgNumLimiterStr)));
String eventMeshServerBatchMsgReqNumPerSecondStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_BATCHMSG_REQ_NUM_PER_SECOND);
if (StringUtils.isNotEmpty(eventMeshServerBatchMsgReqNumPerSecondStr) && StringUtils.isNumeric(eventMeshServerBatchMsgReqNumPerSecondStr)) {
eventMeshBatchMsgRequestNumPerSecond = Integer.valueOf(eventMeshServerBatchMsgReqNumPerSecondStr);
}

String eventMeshServerBatchMsgBatchEnableStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_BATCHMSG_BATCH_ENABLED);
Expand Down Expand Up @@ -180,6 +180,11 @@ public void init() {
if (StringUtils.isNotEmpty(eventMeshServerUseTlsStr)) {
eventMeshServerUseTls = Boolean.valueOf(StringUtils.deleteWhitespace(eventMeshServerUseTlsStr));
}

String eventMeshHttpMsgReqNumPerSecondStr = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_SERVER_MSG_REQ_NUM_PER_SECOND);
if (StringUtils.isNotEmpty(eventMeshHttpMsgReqNumPerSecondStr) && StringUtils.isNumeric(eventMeshHttpMsgReqNumPerSecondStr)) {
eventMeshHttpMsgReqNumPerSecond = Integer.valueOf(eventMeshHttpMsgReqNumPerSecondStr);
}
}
}

Expand All @@ -189,7 +194,7 @@ static class ConfKeys {

public static String KEYS_EVENTMESH_BATCHMSG_THREAD_NUM = "eventMesh.server.batchmsg.threads.num";

public static String KEYS_EVENTMESH_BATCHMSG_RATELIMITER = "eventMesh.server.batchmsg.speed.ratelimiter";
public static String KEYS_EVENTMESH_BATCHMSG_REQ_NUM_PER_SECOND = "eventMesh.server.batchmsg.reqNumPerSecond";

public static String KEYS_EVENTMESH_BATCHMSG_BATCH_ENABLED = "eventMesh.server.batchmsg.batch.enabled";

Expand Down Expand Up @@ -227,5 +232,6 @@ static class ConfKeys {

public static String KEY_EVENTMESH_HTTPS_ENABLED = "eventMesh.server.useTls.enabled";

public static String KEY_EVENTMESH_SERVER_MSG_REQ_NUM_PER_SECOND = "eventMesh.server.http.msgReqnumPerSecond";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,11 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import io.netty.channel.ChannelHandlerContext;
import io.openmessaging.api.Message;
import io.openmessaging.api.OnExceptionContext;
import io.openmessaging.api.SendCallback;
import io.openmessaging.api.SendResult;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
Expand All @@ -52,6 +45,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public class BatchSendMessageProcessor implements HttpRequestProcessor {

public Logger cmdLogger = LoggerFactory.getLogger("cmd");
Expand Down Expand Up @@ -104,7 +103,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
return;
}

if (!eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerBatchMsgNumLimiter
if (!eventMeshHTTPServer.getBatchRateLimiter()
.tryAcquire(Integer.valueOf(sendMessageBatchRequestBody.getSize()), EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
sendMessageBatchResponseHeader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.TimeUnit;

import io.netty.channel.ChannelHandlerContext;
import io.openmessaging.api.Message;
import io.openmessaging.api.OnExceptionContext;
import io.openmessaging.api.SendCallback;
import io.openmessaging.api.SendResult;

import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.IPUtil;
Expand All @@ -48,6 +45,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

public class BatchSendMessageV2Processor implements HttpRequestProcessor {

public Logger cmdLogger = LoggerFactory.getLogger("cmd");
Expand Down Expand Up @@ -122,7 +121,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
}
}

if (!eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerBatchMsgNumLimiter
if (!eventMeshHTTPServer.getBatchRateLimiter()
.tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
sendMessageBatchV2ResponseHeader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,15 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.Map;

import io.netty.channel.ChannelHandlerContext;
import io.openmessaging.api.Message;
import io.openmessaging.api.OnExceptionContext;
import io.openmessaging.api.SendCallback;
import io.openmessaging.api.SendResult;

import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.LiteMessage;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.message.ReplyMessageRequestBody;
import org.apache.eventmesh.common.protocol.http.body.message.ReplyMessageResponseBody;
Expand All @@ -50,6 +46,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.TimeUnit;

public class ReplyMessageProcessor implements HttpRequestProcessor {

public Logger messageLogger = LoggerFactory.getLogger("message");
Expand Down Expand Up @@ -104,6 +103,18 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
return;
}

// control flow rate limit
if (!eventMeshHTTPServer.getMsgRateLimiter()
.tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
replyMessageResponseHeader,
ReplyMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg()));
eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPDiscard();
asyncContext.onComplete(responseEventMeshCommand);
return;
}

String producerGroup = replyMessageRequestBody.getProducerGroup();
EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.openmessaging.api.OnExceptionContext;
import io.openmessaging.api.SendCallback;
import io.openmessaging.api.SendResult;

import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.IPUtil;
Expand All @@ -47,6 +46,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

public class SendAsyncMessageProcessor implements HttpRequestProcessor {

public Logger messageLogger = LoggerFactory.getLogger("message");
Expand Down Expand Up @@ -129,6 +130,18 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
}
}

// control flow rate limit
if (!eventMeshHTTPServer.getMsgRateLimiter()
.tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg()));
eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPDiscard();
asyncContext.onComplete(responseEventMeshCommand);
return;
}

String producerGroup = sendMessageRequestBody.getProducerGroup();
EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,8 @@
package org.apache.eventmesh.runtime.core.protocol.http.processor;

import com.alibaba.fastjson.JSON;

import io.netty.channel.ChannelHandlerContext;
import io.openmessaging.api.Message;
import io.openmessaging.api.OnExceptionContext;
import io.openmessaging.api.SendCallback;
import io.openmessaging.api.SendResult;

import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.common.Constants;
Expand All @@ -51,6 +46,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

public class SendSyncMessageProcessor implements HttpRequestProcessor {

public Logger messageLogger = LoggerFactory.getLogger("message");
Expand Down Expand Up @@ -130,6 +127,18 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
}
}

// control flow rate limit
if (!eventMeshHTTPServer.getMsgRateLimiter()
.tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg()));
eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPDiscard();
asyncContext.onComplete(responseEventMeshCommand);
return;
}

String producerGroup = sendMessageRequestBody.getProducerGroup();
EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);

Expand Down

0 comments on commit 546765e

Please sign in to comment.