diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/ProtocolTransportObject.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/ProtocolTransportObject.java
new file mode 100644
index 0000000000..cf660a35cb
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/ProtocolTransportObject.java
@@ -0,0 +1,12 @@
+package org.apache.eventmesh.common;
+
+import java.io.Serializable;
+
+/**
+ *
+ * - Tcp transport object{@link org.apache.eventmesh.common.protocol.tcp.Package}
+ * - Http transport object{@link org.apache.eventmesh.common.command.HttpCommand}
+ *
+ */
+public interface ProtocolTransportObject extends Serializable {
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java
index b6c61d68df..c5fefcbe8e 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java
@@ -18,8 +18,10 @@
package org.apache.eventmesh.common.command;
import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.ProtocolTransportObject;
import org.apache.eventmesh.common.protocol.http.body.BaseResponseBody;
import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader;
import org.apache.eventmesh.common.protocol.http.header.Header;
import org.apache.eventmesh.common.utils.JsonUtils;
@@ -38,9 +40,9 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
-public class HttpCommand {
+public class HttpCommand implements ProtocolTransportObject {
- private static AtomicLong requestId = new AtomicLong(0);
+ private static final AtomicLong requestId = new AtomicLong(0);
private long opaque;
@@ -90,7 +92,7 @@ public HttpCommand createHttpCommandResponse(Header header,
return response;
}
- public HttpCommand createHttpCommandResponse(Integer retCode, String retMsg) {
+ public HttpCommand createHttpCommandResponse(EventMeshRetCode eventMeshRetCode) {
if (StringUtils.isBlank(requestCode)) {
return null;
}
@@ -101,8 +103,8 @@ public HttpCommand createHttpCommandResponse(Integer retCode, String retMsg) {
baseResponseHeader.setCode(requestCode);
response.setHeader(baseResponseHeader);
BaseResponseBody baseResponseBody = new BaseResponseBody();
- baseResponseBody.setRetCode(retCode);
- baseResponseBody.setRetMsg(retMsg);
+ baseResponseBody.setRetCode(eventMeshRetCode.getRetCode());
+ baseResponseBody.setRetMsg(eventMeshRetCode.getErrMsg());
response.setBody(baseResponseBody);
response.setCmdType(CmdType.RES);
response.setResTime(System.currentTimeMillis());
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java
index d2cde02193..d10881716b 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java
@@ -17,7 +17,9 @@
package org.apache.eventmesh.common.protocol.tcp;
-public class Package {
+import org.apache.eventmesh.common.ProtocolTransportObject;
+
+public class Package implements ProtocolTransportObject {
private Header header;
private Object body;
diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-api/build.gradle b/eventmesh-protocol-plugin/eventmesh-protocol-api/build.gradle
index 8eb049dd2e..94459679ff 100644
--- a/eventmesh-protocol-plugin/eventmesh-protocol-api/build.gradle
+++ b/eventmesh-protocol-plugin/eventmesh-protocol-api/build.gradle
@@ -26,4 +26,10 @@ dependencies {
testImplementation "io.cloudevents:cloudevents-core"
testImplementation "junit:junit"
+
+ compileOnly 'org.projectlombok:lombok:1.18.22'
+ annotationProcessor 'org.projectlombok:lombok:1.18.22'
+
+ testCompileOnly 'org.projectlombok:lombok:1.18.22'
+ testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'
}
\ No newline at end of file
diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-api/src/main/java/org/apache/eventmesh/protocol/api/ProtocolAdaptor.java b/eventmesh-protocol-plugin/eventmesh-protocol-api/src/main/java/org/apache/eventmesh/protocol/api/ProtocolAdaptor.java
index 9505df9583..e8b854effd 100644
--- a/eventmesh-protocol-plugin/eventmesh-protocol-api/src/main/java/org/apache/eventmesh/protocol/api/ProtocolAdaptor.java
+++ b/eventmesh-protocol-plugin/eventmesh-protocol-api/src/main/java/org/apache/eventmesh/protocol/api/ProtocolAdaptor.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.protocol.api;
+import org.apache.eventmesh.common.ProtocolTransportObject;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
import org.apache.eventmesh.spi.EventMeshExtensionType;
@@ -35,7 +36,7 @@
* @since 1.3.0
*/
@EventMeshSPI(isSingleton = true, eventMeshExtensionType = EventMeshExtensionType.PROTOCOL)
-public interface ProtocolAdaptor {
+public interface ProtocolAdaptor {
/**
* transform protocol to {@link CloudEvent}.
@@ -59,7 +60,7 @@ public interface ProtocolAdaptor {
* @param cloudEvent clout event
* @return target protocol
*/
- Object fromCloudEvent(CloudEvent cloudEvent) throws ProtocolHandleException;
+ ProtocolTransportObject fromCloudEvent(CloudEvent cloudEvent) throws ProtocolHandleException;
/**
* Get protocol type.
diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-api/src/main/java/org/apache/eventmesh/protocol/api/ProtocolPluginFactory.java b/eventmesh-protocol-plugin/eventmesh-protocol-api/src/main/java/org/apache/eventmesh/protocol/api/ProtocolPluginFactory.java
index f9208ee9b6..fd0de78707 100644
--- a/eventmesh-protocol-plugin/eventmesh-protocol-api/src/main/java/org/apache/eventmesh/protocol/api/ProtocolPluginFactory.java
+++ b/eventmesh-protocol-plugin/eventmesh-protocol-api/src/main/java/org/apache/eventmesh/protocol/api/ProtocolPluginFactory.java
@@ -17,20 +17,23 @@
package org.apache.eventmesh.protocol.api;
+import org.apache.eventmesh.common.ProtocolTransportObject;
import org.apache.eventmesh.spi.EventMeshExtensionFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import lombok.experimental.UtilityClass;
+
/**
* A factory to get Protocol plugin instance.
*
* @since 1.3.0
*/
-public enum ProtocolPluginFactory {
- ;
+@UtilityClass
+public class ProtocolPluginFactory {
- private static final Map PROTOCOL_ADAPTOR_MAP =
+ private static final Map> PROTOCOL_ADAPTOR_MAP =
new ConcurrentHashMap<>(16);
/**
@@ -40,8 +43,9 @@ public enum ProtocolPluginFactory {
* @return protocol adaptor
* @throws IllegalArgumentException if protocol not found
*/
- public static ProtocolAdaptor getProtocolAdaptor(String protocolType) {
- ProtocolAdaptor protocolAdaptor = PROTOCOL_ADAPTOR_MAP.computeIfAbsent(
+ @SuppressWarnings("unchecked")
+ public static ProtocolAdaptor getProtocolAdaptor(String protocolType) {
+ ProtocolAdaptor protocolAdaptor = PROTOCOL_ADAPTOR_MAP.computeIfAbsent(
protocolType,
(type) -> EventMeshExtensionFactory.getExtension(ProtocolAdaptor.class, type)
);
diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
index 52c53212d7..b9bb8e20b9 100644
--- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
+++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
@@ -20,13 +20,13 @@
import io.cloudevents.CloudEvent;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.ProtocolTransportObject;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
-
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageBatchProtocolResolver;
import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageBatchV2ProtocolResolver;
@@ -41,10 +41,11 @@
*
* @since 1.3.0
*/
-public class CloudEventsProtocolAdaptor implements ProtocolAdaptor {
+public class CloudEventsProtocolAdaptor
+ implements ProtocolAdaptor {
@Override
- public CloudEvent toCloudEvent(T cloudEvent) throws ProtocolHandleException {
+ public CloudEvent toCloudEvent(ProtocolTransportObject cloudEvent) throws ProtocolHandleException {
if (cloudEvent instanceof Package) {
Header header = ((Package) cloudEvent).getHeader();
@@ -84,15 +85,18 @@ private CloudEvent deserializeHttpProtocol(String requestCode, org.apache.eventm
}
@Override
- public List toBatchCloudEvent(T protocol) throws ProtocolHandleException {
+ public List toBatchCloudEvent(ProtocolTransportObject protocol)
+ throws ProtocolHandleException {
return null;
}
@Override
- public Object fromCloudEvent(CloudEvent cloudEvent) throws ProtocolHandleException {
+ public ProtocolTransportObject fromCloudEvent(CloudEvent cloudEvent) throws ProtocolHandleException {
String protocolDesc = cloudEvent.getExtension(Constants.PROTOCOL_DESC).toString();
if (StringUtils.equals("http", protocolDesc)) {
- return new String(cloudEvent.getData().toBytes(), StandardCharsets.UTF_8);
+ // todo: return command, set cloudEvent.getData() to content?
+ return null;
+// return new String(cloudEvent.getData().toBytes(), StandardCharsets.UTF_8);
} else if (StringUtils.equals("tcp", protocolDesc)) {
Package pkg = new Package();
pkg.setBody(cloudEvent);
diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-openmessage/src/main/java/org/apache/eventmesh/protocol/openmessage/OpenMessageProtocolAdaptor.java b/eventmesh-protocol-plugin/eventmesh-protocol-openmessage/src/main/java/org/apache/eventmesh/protocol/openmessage/OpenMessageProtocolAdaptor.java
index 6e00275e0f..1492a28453 100644
--- a/eventmesh-protocol-plugin/eventmesh-protocol-openmessage/src/main/java/org/apache/eventmesh/protocol/openmessage/OpenMessageProtocolAdaptor.java
+++ b/eventmesh-protocol-plugin/eventmesh-protocol-openmessage/src/main/java/org/apache/eventmesh/protocol/openmessage/OpenMessageProtocolAdaptor.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.protocol.openmessage;
+import org.apache.eventmesh.common.ProtocolTransportObject;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
@@ -33,20 +34,20 @@
*
* @since 1.3.0
*/
-public class OpenMessageProtocolAdaptor implements ProtocolAdaptor {
+public class OpenMessageProtocolAdaptor implements ProtocolAdaptor {
@Override
- public CloudEvent toCloudEvent(T message) {
+ public CloudEvent toCloudEvent(ProtocolTransportObject message) {
return null;
}
@Override
- public List toBatchCloudEvent(T protocol) throws ProtocolHandleException {
+ public List toBatchCloudEvent(ProtocolTransportObject protocol) throws ProtocolHandleException {
return null;
}
@Override
- public Object fromCloudEvent(CloudEvent cloudEvent) {
+ public ProtocolTransportObject fromCloudEvent(CloudEvent cloudEvent) {
return null;
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
index 83cba95d8a..9b08724daa 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
@@ -17,16 +17,39 @@
package org.apache.eventmesh.runtime.boot;
+import org.apache.eventmesh.common.ThreadPoolFactory;
+import org.apache.eventmesh.common.command.HttpCommand;
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
+import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
+import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
+import org.apache.eventmesh.common.protocol.http.common.RequestCode;
+import org.apache.eventmesh.common.protocol.http.header.Header;
+import org.apache.eventmesh.runtime.common.Pair;
+import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
+import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
+import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
+import org.apache.eventmesh.runtime.util.RemotingHelper;
+
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.base.Preconditions;
import io.netty.bootstrap.ServerBootstrap;
@@ -44,6 +67,7 @@
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
@@ -61,27 +85,6 @@
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
-import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang3.ObjectUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.common.ThreadPoolFactory;
-import org.apache.eventmesh.common.command.HttpCommand;
-import org.apache.eventmesh.common.protocol.http.body.Body;
-import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
-import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
-import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
-import org.apache.eventmesh.common.protocol.http.common.RequestCode;
-import org.apache.eventmesh.common.protocol.http.header.Header;
-import org.apache.eventmesh.runtime.common.Pair;
-import org.apache.eventmesh.runtime.constants.EventMeshConstants;
-import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
-import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
-import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
-import org.apache.eventmesh.runtime.util.EventMeshUtil;
-import org.apache.eventmesh.runtime.util.RemotingHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public abstract class AbstractHTTPServer extends AbstractRemotingServer {
public Logger httpServerLogger = LoggerFactory.getLogger(this.getClass());
@@ -97,55 +100,41 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
private boolean useTLS;
public ThreadPoolExecutor asyncContextCompleteHandler =
- ThreadPoolFactory.createThreadPoolExecutor(10, 10, "eventMesh-http-asyncContext-");
+ ThreadPoolFactory.createThreadPoolExecutor(10, 10, "EventMesh-http-asyncContext-");
static {
DiskAttribute.deleteOnExitTemporaryFile = false;
}
- protected HashMap> processorTable =
- new HashMap>(64);
+ protected final Map>
+ processorTable = new HashMap<>(64);
public AbstractHTTPServer(int port, boolean useTLS) {
this.port = port;
this.useTLS = useTLS;
}
- public Map parseHTTPHeader(HttpRequest fullReq) {
- Map headerParam = new HashMap<>();
- for (String key : fullReq.headers().names()) {
- if (StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_TYPE.toString(), key)
- || StringUtils.equalsIgnoreCase(HttpHeaderNames.ACCEPT_ENCODING.toString(), key)
- || StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_LENGTH.toString(), key)) {
- continue;
- }
- headerParam.put(key, fullReq.headers().get(key));
- }
- return headerParam;
- }
-
- public void sendError(ChannelHandlerContext ctx,
- HttpResponseStatus status) {
- FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
- status);
- response.headers().add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN +
- "; charset=" + EventMeshConstants.DEFAULT_CHARSET);
- response.headers().add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
- response.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
+ public void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
+ FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
+ HttpHeaders responseHeaders = response.headers();
+ responseHeaders.add(
+ HttpHeaderNames.CONTENT_TYPE, String.format("text/plain; charset=%s", EventMeshConstants.DEFAULT_CHARSET)
+ );
+ responseHeaders.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
+ responseHeaders.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
// todo server span end with error, record status, we should get channel here to get span in channel's context in async call..
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
- public void sendResponse(ChannelHandlerContext ctx,
- DefaultFullHttpResponse response) {
+ public void sendResponse(ChannelHandlerContext ctx, DefaultFullHttpResponse response) {
// todo end server span, we should get channel here to get span in channel's context in async call.
ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
- public void operationComplete(ChannelFuture f) throws Exception {
+ public void operationComplete(ChannelFuture f) {
if (!f.isSuccess()) {
- httpLogger.warn("send response to [{}] fail, will close this channel", RemotingHelper.parseChannelRemoteAddr(f.channel()));
+ httpLogger.warn("send response to [{}] fail, will close this channel",
+ RemotingHelper.parseChannelRemoteAddr(f.channel()));
f.channel().close();
- return;
}
}
});
@@ -158,9 +147,9 @@ public void start() throws Exception {
ServerBootstrap b = new ServerBootstrap();
SSLContext sslContext = useTLS ? SSLContextFactory.getSslContext() : null;
b.group(this.bossGroup, this.workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new HttpsServerInitializer(sslContext))
- .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new HttpsServerInitializer(sslContext))
+ .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
try {
httpServerLogger.info("HTTPServer[port={}] started......", this.port);
ChannelFuture future = b.bind(this.port).sync();
@@ -172,11 +161,10 @@ public void start() throws Exception {
} catch (Exception e1) {
httpServerLogger.error("HTTPServer shutdown Err!", e);
}
- return;
}
};
- Thread t = new Thread(r, "eventMesh-http-server");
+ Thread t = new Thread(r, "EventMesh-http-server");
t.start();
started.compareAndSet(false, true);
}
@@ -196,62 +184,34 @@ public void registerProcessor(Integer requestCode, HttpRequestProcessor processo
Preconditions.checkState(ObjectUtils.allNotNull(requestCode), "requestCode can't be null");
Preconditions.checkState(ObjectUtils.allNotNull(processor), "processor can't be null");
Preconditions.checkState(ObjectUtils.allNotNull(executor), "executor can't be null");
- Pair pair = new Pair(processor, executor);
- this.processorTable.put(requestCode, pair);
+ Pair pair = new Pair<>(processor, executor);
+ this.processorTable.put(requestCode.toString(), pair);
}
class HTTPHandler extends SimpleChannelInboundHandler {
@Override
- protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) throws Exception {
- HttpPostRequestDecoder decoder = null;
+ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) {
// todo start server span, we should get channel here to put span in channel's context in async call.
try {
- if (!httpRequest.decoderResult().isSuccess()) {
- sendError(ctx, HttpResponseStatus.BAD_REQUEST);
+ preProcessHTTPRequestHeader(ctx, httpRequest);
+ final HttpResponseStatus errorStatus = validateHTTPRequest(httpRequest);
+ if (errorStatus != null) {
+ sendError(ctx, errorStatus);
return;
}
+ metrics.summaryMetrics.recordHTTPRequest();
final HttpCommand requestCommand = new HttpCommand();
// todo record command opaque in span.
- httpRequest.headers().set(ProtocolKey.ClientInstanceKey.IP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
-
- String protocolVersion = StringUtils.deleteWhitespace(httpRequest.headers().get(ProtocolKey.VERSION));
- if (StringUtils.isBlank(protocolVersion)) {
- protocolVersion = ProtocolVersion.V1.getVersion();
- httpRequest.headers().set(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion());
- }
-
- metrics.summaryMetrics.recordHTTPRequest();
-
- long bodyDecodeStart = System.currentTimeMillis();
-
- Map bodyMap = new HashMap<>();
-
- if (httpRequest.method() == HttpMethod.GET) {
- QueryStringDecoder getDecoder = new QueryStringDecoder(httpRequest.uri());
- getDecoder.parameters().entrySet().forEach(entry -> {
- bodyMap.put(entry.getKey(), entry.getValue().get(0));
- });
- } else if (httpRequest.method() == HttpMethod.POST) {
- decoder = new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest);
- List parmList = decoder.getBodyHttpDatas();
- for (InterfaceHttpData parm : parmList) {
- if (parm.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
- Attribute data = (Attribute) parm;
- bodyMap.put(data.getName(), data.getValue());
- }
- }
- } else {
- sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
- return;
- }
+ final Map bodyMap = parseHttpRequestBody(httpRequest);
- metrics.summaryMetrics.recordDecodeTimeCost(System.currentTimeMillis() - bodyDecodeStart);
+ // todo: split get and post, use different submethod to process
String requestCode =
- (httpRequest.method() == HttpMethod.POST) ? StringUtils.deleteWhitespace(httpRequest.headers().get(ProtocolKey.REQUEST_CODE))
- : MapUtils.getString(bodyMap, StringUtils.lowerCase(ProtocolKey.REQUEST_CODE), "");
+ (httpRequest.method() == HttpMethod.POST)
+ ? httpRequest.headers().get(ProtocolKey.REQUEST_CODE)
+ : MapUtils.getString(bodyMap, StringUtils.lowerCase(ProtocolKey.REQUEST_CODE), "");
requestCommand.setHttpMethod(httpRequest.method().name());
requestCommand.setHttpVersion(httpRequest.protocolVersion().protocolName());
@@ -260,23 +220,11 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest)
HttpCommand responseCommand = null;
- if (!ProtocolVersion.contains(protocolVersion)) {
- responseCommand = requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg());
- sendResponse(ctx, responseCommand.httpResponse());
- return;
- }
-
if (StringUtils.isBlank(requestCode)
- || !StringUtils.isNumeric(requestCode)
- || !RequestCode.contains(Integer.valueOf(requestCode))
- || !processorTable.containsKey(Integer.valueOf(requestCode))) {
- responseCommand = requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID.getRetCode(), EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID.getErrMsg());
- sendResponse(ctx, responseCommand.httpResponse());
- return;
- }
-
- if (!started.get()) {
- responseCommand = requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_STOP.getRetCode(), EventMeshRetCode.EVENTMESH_STOP.getErrMsg());
+ || !processorTable.containsKey(requestCode)
+ || !RequestCode.contains(Integer.valueOf(requestCode))) {
+ responseCommand =
+ requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID);
sendResponse(ctx, responseCommand.httpResponse());
return;
}
@@ -285,7 +233,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest)
requestCommand.setHeader(Header.buildHeader(requestCode, parseHTTPHeader(httpRequest)));
requestCommand.setBody(Body.buildBody(requestCode, bodyMap));
} catch (Exception e) {
- responseCommand = requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 3));
+ responseCommand = requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_RUNTIME_ERR);
sendResponse(ctx, responseCommand.httpResponse());
return;
}
@@ -294,27 +242,25 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest)
httpLogger.debug("{}", requestCommand);
}
- AsyncContext asyncContext = new AsyncContext(requestCommand, responseCommand, asyncContextCompleteHandler);
+ AsyncContext asyncContext =
+ new AsyncContext<>(requestCommand, responseCommand, asyncContextCompleteHandler);
processEventMeshRequest(ctx, asyncContext);
} catch (Exception ex) {
- httpServerLogger.error("AbrstractHTTPServer.HTTPHandler.channelRead0 err", ex);
+ httpServerLogger.error("AbstractHTTPServer.HTTPHandler.channelRead0 err", ex);
// todo span end with exception.
- } finally {
- try {
- decoder.destroy();
- } catch (Exception e) {
- }
}
}
public void processEventMeshRequest(final ChannelHandlerContext ctx,
final AsyncContext asyncContext) {
- final Pair choosed = processorTable.get(Integer.valueOf(asyncContext.getRequest().getRequestCode()));
+ final HttpCommand request = asyncContext.getRequest();
+ final Pair choosed = processorTable.get(request.getRequestCode());
try {
choosed.getObject2().submit(() -> {
try {
if (choosed.getObject1().rejectRequest()) {
- HttpCommand responseCommand = asyncContext.getRequest().createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR.getRetCode(), EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR.getErrMsg());
+ HttpCommand responseCommand =
+ request.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR);
asyncContext.onComplete(responseCommand);
if (asyncContext.isComplete()) {
if (httpLogger.isDebugEnabled()) {
@@ -326,11 +272,12 @@ public void processEventMeshRequest(final ChannelHandlerContext ctx,
}
choosed.getObject1().processRequest(ctx, asyncContext);
- if (asyncContext == null || !asyncContext.isComplete()) {
+ if (!asyncContext.isComplete()) {
return;
}
- metrics.summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
+ metrics.summaryMetrics.recordHTTPReqResTimeCost(
+ System.currentTimeMillis() - request.getReqTime());
if (httpLogger.isDebugEnabled()) {
httpLogger.debug("{}", asyncContext.getResponse());
@@ -342,13 +289,14 @@ public void processEventMeshRequest(final ChannelHandlerContext ctx,
}
});
} catch (RejectedExecutionException re) {
- HttpCommand responseCommand = asyncContext.getRequest().createHttpCommandResponse(EventMeshRetCode.OVERLOAD.getRetCode(), EventMeshRetCode.OVERLOAD.getErrMsg());
+ HttpCommand responseCommand = request.createHttpCommandResponse(EventMeshRetCode.OVERLOAD);
asyncContext.onComplete(responseCommand);
metrics.summaryMetrics.recordHTTPDiscard();
- metrics.summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - responseCommand.getReqTime());
+ metrics.summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime());
try {
sendResponse(ctx, asyncContext.getResponse().httpResponse());
} catch (Exception e) {
+ // ignore
}
}
}
@@ -360,14 +308,18 @@ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- if (null != cause) cause.printStackTrace();
- if (null != ctx) ctx.close();
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ if (null != cause) {
+ logger.error("", cause);
+ }
+ if (null != ctx) {
+ ctx.close();
+ }
}
}
class HttpConnectionHandler extends ChannelDuplexHandler {
- public AtomicInteger connections = new AtomicInteger(0);
+ public final AtomicInteger connections = new AtomicInteger(0);
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
@@ -384,8 +336,9 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
int c = connections.incrementAndGet();
if (c > 20000) {
- httpServerLogger.warn("client|http|channelActive|remoteAddress={}|msg={}", remoteAddress, "too many client(20000) connect " +
- "this eventMesh server");
+ httpServerLogger
+ .warn("client|http|channelActive|remoteAddress={}|msg={}", remoteAddress,
+ "too many client(20000) connect this eventMesh server");
ctx.close();
return;
}
@@ -396,19 +349,17 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
connections.decrementAndGet();
- final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
super.channelInactive(ctx);
}
-
@Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- httpServerLogger.info("client|http|userEventTriggered|remoteAddress={}|msg={}", remoteAddress, evt.getClass()
- .getName());
+ httpServerLogger.info("client|http|userEventTriggered|remoteAddress={}|msg={}",
+ remoteAddress, evt.getClass().getName());
ctx.close();
}
}
@@ -419,14 +370,14 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
class HttpsServerInitializer extends ChannelInitializer {
- private SSLContext sslContext;
+ private final SSLContext sslContext;
public HttpsServerInitializer(SSLContext sslContext) {
this.sslContext = sslContext;
}
@Override
- protected void initChannel(SocketChannel channel) throws Exception {
+ protected void initChannel(SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
@@ -436,10 +387,92 @@ protected void initChannel(SocketChannel channel) throws Exception {
pipeline.addFirst("ssl", new SslHandler(sslEngine));
}
pipeline.addLast(new HttpRequestDecoder(),
- new HttpResponseEncoder(),
- new HttpConnectionHandler(),
- new HttpObjectAggregator(Integer.MAX_VALUE),
- new HTTPHandler());
+ new HttpResponseEncoder(),
+ new HttpConnectionHandler(),
+ new HttpObjectAggregator(Integer.MAX_VALUE),
+ new HTTPHandler());
+ }
+ }
+
+ private Map parseHTTPHeader(HttpRequest fullReq) {
+ Map headerParam = new HashMap<>();
+ for (String key : fullReq.headers().names()) {
+ if (StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_TYPE.toString(), key)
+ || StringUtils.equalsIgnoreCase(HttpHeaderNames.ACCEPT_ENCODING.toString(), key)
+ || StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_LENGTH.toString(), key)) {
+ continue;
+ }
+ headerParam.put(key, fullReq.headers().get(key));
+ }
+ return headerParam;
+ }
+
+ /**
+ * Validate request, return error status.
+ *
+ * @param httpRequest
+ * @return if request is validated return null else return error status
+ */
+ private HttpResponseStatus validateHTTPRequest(HttpRequest httpRequest) {
+ if (!started.get()) {
+ return HttpResponseStatus.SERVICE_UNAVAILABLE;
+ }
+ if (!httpRequest.decoderResult().isSuccess()) {
+ return HttpResponseStatus.BAD_REQUEST;
+ }
+ if (!HttpMethod.GET.equals(httpRequest.method()) && !HttpMethod.POST.equals(httpRequest.method())) {
+ return HttpResponseStatus.METHOD_NOT_ALLOWED;
+ }
+ final String protocolVersion = httpRequest.headers().get(ProtocolKey.VERSION);
+ if (!ProtocolVersion.contains(protocolVersion)) {
+ return HttpResponseStatus.BAD_REQUEST;
+ }
+ return null;
+ }
+
+ /**
+ * Inject ip and protocol version, if the protocol version is empty, set default to {@link ProtocolVersion#V1}.
+ *
+ * @param ctx
+ * @param httpRequest
+ */
+ private void preProcessHTTPRequestHeader(ChannelHandlerContext ctx, HttpRequest httpRequest) {
+ HttpHeaders requestHeaders = httpRequest.headers();
+ requestHeaders.set(ProtocolKey.ClientInstanceKey.IP,
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+ String protocolVersion = httpRequest.headers().get(ProtocolKey.VERSION);
+ if (StringUtils.isBlank(protocolVersion)) {
+ requestHeaders.set(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion());
}
}
+
+ /**
+ * Parse request body to map
+ *
+ * @param httpRequest
+ * @return
+ */
+ private Map parseHttpRequestBody(HttpRequest httpRequest) throws IOException {
+ final long bodyDecodeStart = System.currentTimeMillis();
+ Map httpRequestBody = new HashMap<>();
+
+ if (HttpMethod.GET.equals(httpRequest.method())) {
+ QueryStringDecoder getDecoder = new QueryStringDecoder(httpRequest.uri());
+ getDecoder.parameters().forEach((key, value) -> httpRequestBody.put(key, value.get(0)));
+ } else if (HttpMethod.POST.equals(httpRequest.method())) {
+ HttpPostRequestDecoder decoder =
+ new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest);
+ for (InterfaceHttpData parm : decoder.getBodyHttpDatas()) {
+ if (parm.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
+ Attribute data = (Attribute) parm;
+ httpRequestBody.put(data.getName(), data.getValue());
+ }
+ }
+ decoder.destroy();
+ }
+ metrics.summaryMetrics.recordDecodeTimeCost(System.currentTimeMillis() - bodyDecodeStart);
+ return httpRequestBody;
+ }
+
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index f5055ba4af..ce3e2cc10d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -17,15 +17,21 @@
package org.apache.eventmesh.runtime.boot;
-import com.google.common.eventbus.EventBus;
-import com.google.common.util.concurrent.RateLimiter;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.runtime.common.ServiceState;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager;
-import org.apache.eventmesh.runtime.core.protocol.http.processor.*;
+import org.apache.eventmesh.runtime.core.protocol.http.processor.AdminMetricsProcessor;
+import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageProcessor;
+import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageV2Processor;
+import org.apache.eventmesh.runtime.core.protocol.http.processor.HeartBeatProcessor;
+import org.apache.eventmesh.runtime.core.protocol.http.processor.ReplyMessageProcessor;
+import org.apache.eventmesh.runtime.core.protocol.http.processor.SendAsyncMessageProcessor;
+import org.apache.eventmesh.runtime.core.protocol.http.processor.SendSyncMessageProcessor;
+import org.apache.eventmesh.runtime.core.protocol.http.processor.SubscribeProcessor;
+import org.apache.eventmesh.runtime.core.protocol.http.processor.UnSubscribeProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
import org.apache.eventmesh.runtime.core.protocol.http.producer.ProducerManager;
import org.apache.eventmesh.runtime.core.protocol.http.push.AbstractHTTPPushRequest;
@@ -38,6 +44,9 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
+import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.RateLimiter;
+
public class EventMeshHTTPServer extends AbstractHTTPServer {
private EventMeshServer eventMeshServer;
@@ -46,9 +55,11 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
private EventMeshHTTPConfiguration eventMeshHttpConfiguration;
- public final ConcurrentHashMap localConsumerGroupMapping = new ConcurrentHashMap<>();
+ public final ConcurrentHashMap localConsumerGroupMapping =
+ new ConcurrentHashMap<>();
- public final ConcurrentHashMap> localClientInfoMapping = new ConcurrentHashMap<>();
+ public final ConcurrentHashMap> localClientInfoMapping =
+ new ConcurrentHashMap<>();
public EventMeshHTTPServer(EventMeshServer eventMeshServer,
EventMeshHTTPConfiguration eventMeshHttpConfiguration) {
@@ -96,29 +107,45 @@ public void shutdownThreadPool() throws Exception {
public void initThreadPool() throws Exception {
- BlockingQueue batchMsgThreadPoolQueue = new LinkedBlockingQueue(eventMeshHttpConfiguration.eventMeshServerBatchBlockQSize);
- batchMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpConfiguration.eventMeshServerBatchMsgThreadNum,
- eventMeshHttpConfiguration.eventMeshServerBatchMsgThreadNum, batchMsgThreadPoolQueue, "eventMesh-batchMsg-", true);
-
- BlockingQueue sendMsgThreadPoolQueue = new LinkedBlockingQueue(eventMeshHttpConfiguration.eventMeshServerSendMsgBlockQSize);
- sendMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpConfiguration.eventMeshServerSendMsgThreadNum,
- eventMeshHttpConfiguration.eventMeshServerSendMsgThreadNum, sendMsgThreadPoolQueue, "eventMesh-sendMsg-", true);
-
- BlockingQueue pushMsgThreadPoolQueue = new LinkedBlockingQueue(eventMeshHttpConfiguration.eventMeshServerPushMsgBlockQSize);
- pushMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpConfiguration.eventMeshServerPushMsgThreadNum,
- eventMeshHttpConfiguration.eventMeshServerPushMsgThreadNum, pushMsgThreadPoolQueue, "eventMesh-pushMsg-", true);
-
- BlockingQueue clientManageThreadPoolQueue = new LinkedBlockingQueue(eventMeshHttpConfiguration.eventMeshServerClientManageBlockQSize);
- clientManageExecutor = ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpConfiguration.eventMeshServerClientManageThreadNum,
- eventMeshHttpConfiguration.eventMeshServerClientManageThreadNum, clientManageThreadPoolQueue, "eventMesh-clientManage-", true);
+ BlockingQueue batchMsgThreadPoolQueue =
+ new LinkedBlockingQueue(eventMeshHttpConfiguration.eventMeshServerBatchBlockQSize);
+ batchMsgExecutor =
+ ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpConfiguration.eventMeshServerBatchMsgThreadNum,
+ eventMeshHttpConfiguration.eventMeshServerBatchMsgThreadNum, batchMsgThreadPoolQueue,
+ "eventMesh-batchMsg-", true);
+
+ BlockingQueue sendMsgThreadPoolQueue =
+ new LinkedBlockingQueue(eventMeshHttpConfiguration.eventMeshServerSendMsgBlockQSize);
+ sendMsgExecutor =
+ ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpConfiguration.eventMeshServerSendMsgThreadNum,
+ eventMeshHttpConfiguration.eventMeshServerSendMsgThreadNum, sendMsgThreadPoolQueue,
+ "eventMesh-sendMsg-", true);
+
+ BlockingQueue pushMsgThreadPoolQueue =
+ new LinkedBlockingQueue(eventMeshHttpConfiguration.eventMeshServerPushMsgBlockQSize);
+ pushMsgExecutor =
+ ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpConfiguration.eventMeshServerPushMsgThreadNum,
+ eventMeshHttpConfiguration.eventMeshServerPushMsgThreadNum, pushMsgThreadPoolQueue,
+ "eventMesh-pushMsg-", true);
+
+ BlockingQueue clientManageThreadPoolQueue =
+ new LinkedBlockingQueue(eventMeshHttpConfiguration.eventMeshServerClientManageBlockQSize);
+ clientManageExecutor =
+ ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpConfiguration.eventMeshServerClientManageThreadNum,
+ eventMeshHttpConfiguration.eventMeshServerClientManageThreadNum, clientManageThreadPoolQueue,
+ "eventMesh-clientManage-", true);
BlockingQueue adminThreadPoolQueue = new LinkedBlockingQueue(50);
- adminExecutor = ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpConfiguration.eventMeshServerAdminThreadNum,
- eventMeshHttpConfiguration.eventMeshServerAdminThreadNum, adminThreadPoolQueue, "eventMesh-admin-", true);
+ adminExecutor =
+ ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpConfiguration.eventMeshServerAdminThreadNum,
+ eventMeshHttpConfiguration.eventMeshServerAdminThreadNum, adminThreadPoolQueue, "eventMesh-admin-",
+ true);
BlockingQueue replyMessageThreadPoolQueue = new LinkedBlockingQueue(100);
- replyMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpConfiguration.eventMeshServerReplyMsgThreadNum,
- eventMeshHttpConfiguration.eventMeshServerReplyMsgThreadNum, replyMessageThreadPoolQueue, "eventMesh-replyMsg-", true);
+ replyMsgExecutor =
+ ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpConfiguration.eventMeshServerReplyMsgThreadNum,
+ eventMeshHttpConfiguration.eventMeshServerReplyMsgThreadNum, replyMessageThreadPoolQueue,
+ "eventMesh-replyMsg-", true);
}
public ThreadPoolExecutor getBatchMsgExecutor() {
@@ -210,20 +237,17 @@ public void shutdown() throws Exception {
public void registerHTTPRequestProcessor() {
BatchSendMessageProcessor batchSendMessageProcessor = new BatchSendMessageProcessor(this);
- registerProcessor(RequestCode.MSG_BATCH_SEND.getRequestCode(),
- batchSendMessageProcessor, batchMsgExecutor);
+ registerProcessor(RequestCode.MSG_BATCH_SEND.getRequestCode(), batchSendMessageProcessor, batchMsgExecutor);
BatchSendMessageV2Processor batchSendMessageV2Processor = new BatchSendMessageV2Processor(this);
- registerProcessor(RequestCode.MSG_BATCH_SEND_V2.getRequestCode(),
- batchSendMessageV2Processor, batchMsgExecutor);
+ registerProcessor(RequestCode.MSG_BATCH_SEND_V2.getRequestCode(), batchSendMessageV2Processor,
+ batchMsgExecutor);
SendSyncMessageProcessor sendSyncMessageProcessor = new SendSyncMessageProcessor(this);
- registerProcessor(RequestCode.MSG_SEND_SYNC.getRequestCode(),
- sendSyncMessageProcessor, sendMsgExecutor);
+ registerProcessor(RequestCode.MSG_SEND_SYNC.getRequestCode(), sendSyncMessageProcessor, sendMsgExecutor);
SendAsyncMessageProcessor sendAsyncMessageProcessor = new SendAsyncMessageProcessor(this);
- registerProcessor(RequestCode.MSG_SEND_ASYNC.getRequestCode(),
- sendAsyncMessageProcessor, sendMsgExecutor);
+ registerProcessor(RequestCode.MSG_SEND_ASYNC.getRequestCode(), sendAsyncMessageProcessor, sendMsgExecutor);
AdminMetricsProcessor adminMetricsProcessor = new AdminMetricsProcessor(this);
registerProcessor(RequestCode.ADMIN_METRICS.getRequestCode(), adminMetricsProcessor, adminExecutor);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java
index dac27461b8..6feaf9ae22 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java
@@ -28,8 +28,6 @@
public class AdminMetricsProcessor implements HttpRequestProcessor {
- public Logger cmdLogger = LoggerFactory.getLogger("cmd");
-
private EventMeshHTTPServer eventMeshHTTPServer;
public AdminMetricsProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminShutdownProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminShutdownProcessor.java
index 775a7c1730..ca6212afb0 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminShutdownProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminShutdownProcessor.java
@@ -17,8 +17,6 @@
package org.apache.eventmesh.runtime.core.protocol.http.processor;
-import io.netty.channel.ChannelHandlerContext;
-
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
@@ -28,9 +26,12 @@
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.util.RemotingHelper;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.channel.ChannelHandlerContext;
+
public class AdminShutdownProcessor implements HttpRequestProcessor {
public Logger cmdLogger = LoggerFactory.getLogger("cmd");
@@ -45,14 +46,14 @@ public AdminShutdownProcessor(EventMeshServer eventMeshServer) {
public void processRequest(ChannelHandlerContext ctx, AsyncContext asyncContext) throws Exception {
HttpCommand responseEventMeshCommand;
- cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
- EventMeshConstants.PROTOCOL_HTTP,
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
+ cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}",
+ RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
+ EventMeshConstants.PROTOCOL_HTTP,
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
eventMeshServer.shutdown();
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- EventMeshRetCode.SUCCESS.getRetCode(), EventMeshRetCode.SUCCESS.getErrMsg());
+ responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(EventMeshRetCode.SUCCESS);
asyncContext.onComplete(responseEventMeshCommand);
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
index c38ff5dfb2..2787142e7d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
@@ -17,17 +17,16 @@
package org.apache.eventmesh.runtime.core.protocol.http.processor;
-import io.cloudevents.CloudEvent;
-import io.cloudevents.core.builder.CloudEventBuilder;
-import io.netty.channel.ChannelHandlerContext;
-import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
-import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.IPUtil;
+import org.apache.eventmesh.common.ProtocolTransportObject;
import org.apache.eventmesh.common.command.HttpCommand;
-import org.apache.eventmesh.common.protocol.http.body.message.*;
+import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2RequestBody;
+import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2ResponseBody;
+import org.apache.eventmesh.common.protocol.http.body.message.SendMessageRequestBody;
+import org.apache.eventmesh.common.protocol.http.body.message.SendMessageResponseBody;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
@@ -44,13 +43,19 @@
import org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+
import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import io.netty.channel.ChannelHandlerContext;
+
public class BatchSendMessageV2Processor implements HttpRequestProcessor {
public Logger cmdLogger = LoggerFactory.getLogger("cmd");
@@ -66,66 +71,89 @@ public BatchSendMessageV2Processor(EventMeshHTTPServer eventMeshHTTPServer) {
public Logger batchMessageLogger = LoggerFactory.getLogger("batchMessage");
@Override
- public void processRequest(ChannelHandlerContext ctx, AsyncContext asyncContext) throws Exception {
+ public void processRequest(ChannelHandlerContext ctx, AsyncContext asyncContext)
+ throws Exception {
HttpCommand responseEventMeshCommand;
+ final HttpCommand request = asyncContext.getRequest();
+ final Integer requestCode = Integer.valueOf(request.getRequestCode());
- cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
- EventMeshConstants.PROTOCOL_HTTP,
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
+ cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}",
+ RequestCode.get(requestCode),
+ EventMeshConstants.PROTOCOL_HTTP,
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
- SendMessageBatchV2RequestHeader sendMessageBatchV2RequestHeader = (SendMessageBatchV2RequestHeader) asyncContext.getRequest().getHeader();
+ SendMessageBatchV2RequestHeader sendMessageBatchV2RequestHeader =
+ (SendMessageBatchV2RequestHeader) asyncContext.getRequest().getHeader();
String protocolType = sendMessageBatchV2RequestHeader.getProtocolType();
- ProtocolAdaptor httpCommandProtocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
+ ProtocolAdaptor httpCommandProtocolAdaptor =
+ ProtocolPluginFactory.getProtocolAdaptor(protocolType);
CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(asyncContext.getRequest());
SendMessageBatchV2ResponseHeader sendMessageBatchV2ResponseHeader =
- SendMessageBatchV2ResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
- IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
-
+ SendMessageBatchV2ResponseHeader.buildHeader(
+ requestCode,
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
+ IPUtil.getLocalAddress(),
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC
+ );
+
+ // todo: use validate processor to check
//validate event
- if (event != null
- || StringUtils.isBlank(event.getId())
- || event.getSource() != null
- || event.getSpecVersion() != null
- || StringUtils.isBlank(event.getType())
- || StringUtils.isBlank(event.getSubject())) {
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageBatchV2ResponseHeader,
- SendMessageBatchV2ResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
+ if (StringUtils.isBlank(event.getId())
+ || event.getSource() != null
+ || event.getSpecVersion() != null
+ || StringUtils.isBlank(event.getType())
+ || StringUtils.isBlank(event.getSubject())) {
+ responseEventMeshCommand = request.createHttpCommandResponse(
+ sendMessageBatchV2ResponseHeader,
+ SendMessageBatchV2ResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
- String idc = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.IDC)).toString();
- String pid = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PID)).toString();
- String sys = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS)).toString();
+ String idc = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.IDC))
+ .toString();
+ String pid = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PID))
+ .toString();
+ String sys = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS))
+ .toString();
//validate event-extension
if (StringUtils.isBlank(idc)
- || StringUtils.isBlank(pid)
- || !StringUtils.isNumeric(pid)
- || StringUtils.isBlank(sys)) {
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageBatchV2ResponseHeader,
- SendMessageBatchV2ResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
+ || StringUtils.isBlank(pid)
+ || !StringUtils.isNumeric(pid)
+ || StringUtils.isBlank(sys)) {
+ responseEventMeshCommand = request.createHttpCommandResponse(
+ sendMessageBatchV2ResponseHeader,
+ SendMessageBatchV2ResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
- String bizNo = Objects.requireNonNull(event.getExtension(SendMessageBatchV2RequestBody.BIZSEQNO)).toString();
- String producerGroup = Objects.requireNonNull(event.getExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP)).toString();
+ String bizNo =
+ Objects.requireNonNull(event.getExtension(SendMessageBatchV2RequestBody.BIZSEQNO))
+ .toString();
+ String producerGroup =
+ Objects.requireNonNull(event.getExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP))
+ .toString();
String topic = event.getSubject();
if (StringUtils.isBlank(bizNo)
- || StringUtils.isBlank(topic)
- || StringUtils.isBlank(producerGroup)
- || event.getData() != null) {
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageBatchV2ResponseHeader,
- SendMessageBatchV2ResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
+ || StringUtils.isBlank(topic)
+ || StringUtils.isBlank(producerGroup)
+ || event.getData() != null) {
+ responseEventMeshCommand = request.createHttpCommandResponse(
+ sendMessageBatchV2ResponseHeader,
+ SendMessageBatchV2ResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
@@ -136,38 +164,46 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext
String user = event.getExtension(ProtocolKey.ClientInstanceKey.USERNAME).toString();
String pass = event.getExtension(ProtocolKey.ClientInstanceKey.PASSWD).toString();
String subsystem = event.getExtension(ProtocolKey.ClientInstanceKey.SYS).toString();
- int requestCode = Integer.parseInt(asyncContext.getRequest().getRequestCode());
try {
Acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, topic, requestCode);
} catch (Exception e) {
//String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageBatchV2ResponseHeader,
- SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
+ sendMessageBatchV2ResponseHeader,
+ SendMessageResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(),
+ e.getMessage()));
asyncContext.onComplete(responseEventMeshCommand);
- aclLogger.warn("CLIENT HAS NO PERMISSION,BatchSendMessageV2Processor send failed", e);
+ aclLogger
+ .warn("CLIENT HAS NO PERMISSION,BatchSendMessageV2Processor send failed", e);
return;
}
}
if (!eventMeshHTTPServer.getBatchRateLimiter()
- .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageBatchV2ResponseHeader,
- SendMessageBatchV2ResponseBody.buildBody(EventMeshRetCode.EVENTMESH_BATCH_SPEED_OVER_LIMIT_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_BATCH_SPEED_OVER_LIMIT_ERR.getErrMsg()));
+ .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS,
+ TimeUnit.MILLISECONDS)) {
+ responseEventMeshCommand = request.createHttpCommandResponse(
+ sendMessageBatchV2ResponseHeader,
+ SendMessageBatchV2ResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_BATCH_SPEED_OVER_LIMIT_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_BATCH_SPEED_OVER_LIMIT_ERR.getErrMsg()));
eventMeshHTTPServer.metrics.summaryMetrics
- .recordSendBatchMsgDiscard(1);
+ .recordSendBatchMsgDiscard(1);
asyncContext.onComplete(responseEventMeshCommand);
return;
}
- EventMeshProducer batchEventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
+ EventMeshProducer batchEventMeshProducer =
+ eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
batchEventMeshProducer.getMqProducerWrapper().getMeshMQProducer().setExtFields();
if (!batchEventMeshProducer.getStarted().get()) {
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageBatchV2ResponseHeader,
- SendMessageBatchV2ResponseBody.buildBody(EventMeshRetCode.EVENTMESH_BATCH_PRODUCER_STOPED_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_BATCH_PRODUCER_STOPED_ERR.getErrMsg()));
+ responseEventMeshCommand = request.createHttpCommandResponse(
+ sendMessageBatchV2ResponseHeader,
+ SendMessageBatchV2ResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_BATCH_PRODUCER_STOPED_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_BATCH_PRODUCER_STOPED_ERR.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
@@ -175,18 +211,22 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext
long batchStartTime = System.currentTimeMillis();
String ttl = String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS);
+ // todo: use hashmap to avoid copy
if (StringUtils.isBlank(event.getExtension(SendMessageRequestBody.TTL).toString())
- && !StringUtils.isNumeric(event.getExtension(SendMessageRequestBody.TTL).toString())) {
- event = CloudEventBuilder.from(event).withExtension(SendMessageRequestBody.TTL, ttl).build();
+ && !StringUtils.isNumeric(event.getExtension(SendMessageRequestBody.TTL).toString())) {
+ event = CloudEventBuilder.from(event).withExtension(SendMessageRequestBody.TTL, ttl)
+ .build();
}
try {
event = CloudEventBuilder.from(event)
- .withExtension("msgType", "persistent")
- .withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
- .withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
- .build();
+ .withExtension("msgType", "persistent")
+ .withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()))
+ .withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()))
+ .build();
if (batchMessageLogger.isDebugEnabled()) {
batchMessageLogger.debug("msg2MQMsg suc, topic:{}, msg:{}", topic, event.getData());
}
@@ -194,54 +234,67 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext
} catch (Exception e) {
batchMessageLogger.error("msg2MQMsg err, topic:{}, msg:{}", topic, event.getData(), e);
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageBatchV2ResponseHeader,
- SendMessageBatchV2ResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
+ sendMessageBatchV2ResponseHeader,
+ SendMessageBatchV2ResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg() +
+ EventMeshUtil.stackTrace(e, 2)));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
eventMeshHTTPServer.metrics.summaryMetrics.recordSendBatchMsg(1);
- final SendMessageContext sendMessageContext = new SendMessageContext(bizNo, event, batchEventMeshProducer, eventMeshHTTPServer);
+ final SendMessageContext sendMessageContext =
+ new SendMessageContext(bizNo, event, batchEventMeshProducer, eventMeshHTTPServer);
try {
batchEventMeshProducer.send(sendMessageContext, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
long batchEndTime = System.currentTimeMillis();
- eventMeshHTTPServer.metrics.summaryMetrics.recordBatchSendMsgCost(batchEndTime - batchStartTime);
- batchMessageLogger.debug("batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}",
- bizNo, batchEndTime - batchStartTime, topic);
+ eventMeshHTTPServer.metrics.summaryMetrics
+ .recordBatchSendMsgCost(batchEndTime - batchStartTime);
+ batchMessageLogger.debug(
+ "batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}",
+ bizNo, batchEndTime - batchStartTime, topic);
}
@Override
public void onException(OnExceptionContext context) {
long batchEndTime = System.currentTimeMillis();
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
- eventMeshHTTPServer.metrics.summaryMetrics.recordBatchSendMsgCost(batchEndTime - batchStartTime);
- batchMessageLogger.error("batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}",
- bizNo, batchEndTime - batchStartTime, topic, context.getException());
+ eventMeshHTTPServer.metrics.summaryMetrics
+ .recordBatchSendMsgCost(batchEndTime - batchStartTime);
+ batchMessageLogger.error(
+ "batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}",
+ bizNo, batchEndTime - batchStartTime, topic, context.getException());
}
});
} catch (Exception e) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageBatchV2ResponseHeader,
- SendMessageBatchV2ResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_BATCHLOG_MSG_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_SEND_BATCHLOG_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
+ sendMessageBatchV2ResponseHeader,
+ SendMessageBatchV2ResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_SEND_BATCHLOG_MSG_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_SEND_BATCHLOG_MSG_ERR.getErrMsg() +
+ EventMeshUtil.stackTrace(e, 2)));
asyncContext.onComplete(responseEventMeshCommand);
long batchEndTime = System.currentTimeMillis();
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
- eventMeshHTTPServer.metrics.summaryMetrics.recordBatchSendMsgCost(batchEndTime - batchStartTime);
- batchMessageLogger.error("batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}",
- bizNo, batchEndTime - batchStartTime, topic, e);
+ eventMeshHTTPServer.metrics.summaryMetrics
+ .recordBatchSendMsgCost(batchEndTime - batchStartTime);
+ batchMessageLogger.error(
+ "batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}",
+ bizNo, batchEndTime - batchStartTime, topic, e);
}
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageBatchV2ResponseHeader,
- SendMessageBatchV2ResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(), EventMeshRetCode.SUCCESS.getErrMsg()));
+ sendMessageBatchV2ResponseHeader,
+ SendMessageBatchV2ResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(),
+ EventMeshRetCode.SUCCESS.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
- return;
}
@Override
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
index e0341cfd63..3fefd5157b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
@@ -17,16 +17,6 @@
package org.apache.eventmesh.runtime.core.protocol.http.processor;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import io.netty.channel.ChannelHandlerContext;
-
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatRequestBody;
@@ -45,9 +35,21 @@
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.channel.ChannelHandlerContext;
+
public class HeartBeatProcessor implements HttpRequestProcessor {
public Logger httpLogger = LoggerFactory.getLogger("http");
@@ -63,38 +65,42 @@ public HeartBeatProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
@Override
public void processRequest(ChannelHandlerContext ctx, AsyncContext asyncContext) throws Exception {
HttpCommand responseEventMeshCommand;
- httpLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
- EventMeshConstants.PROTOCOL_HTTP,
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
+ httpLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}",
+ RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
+ EventMeshConstants.PROTOCOL_HTTP,
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
HeartbeatRequestHeader heartbeatRequestHeader = (HeartbeatRequestHeader) asyncContext.getRequest().getHeader();
HeartbeatRequestBody heartbeatRequestBody = (HeartbeatRequestBody) asyncContext.getRequest().getBody();
HeartbeatResponseHeader heartbeatResponseHeader =
- HeartbeatResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
- IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
+ HeartbeatResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()),
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
+ IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
//validate header
if (StringUtils.isBlank(heartbeatRequestHeader.getIdc())
- || StringUtils.isBlank(heartbeatRequestHeader.getPid())
- || !StringUtils.isNumeric(heartbeatRequestHeader.getPid())
- || StringUtils.isBlank(heartbeatRequestHeader.getSys())) {
+ || StringUtils.isBlank(heartbeatRequestHeader.getPid())
+ || !StringUtils.isNumeric(heartbeatRequestHeader.getPid())
+ || StringUtils.isBlank(heartbeatRequestHeader.getSys())) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- heartbeatResponseHeader,
- HeartbeatResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
+ heartbeatResponseHeader,
+ HeartbeatResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
//validate body
if (StringUtils.isBlank(heartbeatRequestBody.getClientType())
- || StringUtils.isBlank(heartbeatRequestBody.getConsumerGroup())
- || CollectionUtils.isEmpty(heartbeatRequestBody.getHeartbeatEntities())) {
+ || StringUtils.isBlank(heartbeatRequestBody.getConsumerGroup())
+ || CollectionUtils.isEmpty(heartbeatRequestBody.getHeartbeatEntities())) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- heartbeatResponseHeader,
- HeartbeatResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
+ heartbeatResponseHeader,
+ HeartbeatResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
@@ -126,7 +132,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext
}
//do acl check
- if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
+ if (eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
String user = heartbeatRequestHeader.getUsername();
String pass = heartbeatRequestHeader.getPasswd();
@@ -137,8 +143,9 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext
//String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- heartbeatResponseHeader,
- SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
+ heartbeatResponseHeader,
+ SendMessageResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
asyncContext.onComplete(responseEventMeshCommand);
aclLogger.warn("CLIENT HAS NO PERMISSION,HeartBeatProcessor subscribe failed", e);
return;
@@ -161,9 +168,11 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext
}
synchronized (eventMeshHTTPServer.localClientInfoMapping) {
for (Map.Entry> groupTopicClientMapping : tmp.entrySet()) {
- List localClientList = eventMeshHTTPServer.localClientInfoMapping.get(groupTopicClientMapping.getKey());
+ List localClientList =
+ eventMeshHTTPServer.localClientInfoMapping.get(groupTopicClientMapping.getKey());
if (CollectionUtils.isEmpty(localClientList)) {
- eventMeshHTTPServer.localClientInfoMapping.put(groupTopicClientMapping.getKey(), groupTopicClientMapping.getValue());
+ eventMeshHTTPServer.localClientInfoMapping
+ .put(groupTopicClientMapping.getKey(), groupTopicClientMapping.getValue());
} else {
List tmpClientList = groupTopicClientMapping.getValue();
supplyClientInfoList(tmpClientList, localClientList);
@@ -184,24 +193,24 @@ public void onResponse(HttpCommand httpCommand) {
httpLogger.debug("{}", httpCommand);
}
eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
- eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
+ eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(
+ System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
} catch (Exception ex) {
}
}
};
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- EventMeshRetCode.SUCCESS.getRetCode(), EventMeshRetCode.SUCCESS.getErrMsg());
+ responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(EventMeshRetCode.SUCCESS);
asyncContext.onComplete(responseEventMeshCommand, handler);
} catch (Exception e) {
HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
- heartbeatResponseHeader,
- HeartbeatResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HEARTBEAT_ERR.getRetCode(),
- EventMeshRetCode.EVENTMESH_HEARTBEAT_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
+ heartbeatResponseHeader,
+ HeartbeatResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HEARTBEAT_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_HEARTBEAT_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
asyncContext.onComplete(err);
long endTime = System.currentTimeMillis();
httpLogger.error("message|eventMesh2mq|REQ|ASYNC|heartBeatMessageCost={}ms",
- endTime - startTime, e);
+ endTime - startTime, e);
eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed();
eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime);
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
index 7630315733..5b2914d6d6 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
@@ -27,6 +27,7 @@
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.IPUtil;
+import org.apache.eventmesh.common.ProtocolTransportObject;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.message.ReplyMessageRequestBody;
import org.apache.eventmesh.common.protocol.http.body.message.ReplyMessageResponseBody;
@@ -81,7 +82,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext
// ReplyMessageRequestBody replyMessageRequestBody = (ReplyMessageRequestBody) asyncContext.getRequest().getBody();
String protocolType = replyMessageRequestHeader.getProtocolType();
- ProtocolAdaptor httpCommandProtocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
+ ProtocolAdaptor httpCommandProtocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(asyncContext.getRequest());
ReplyMessageResponseHeader replyMessageResponseHeader =
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
index 768b0cc3bf..d3e05d1cd8 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
@@ -26,6 +26,7 @@
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.LiteMessage;
+import org.apache.eventmesh.common.ProtocolTransportObject;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageRequestBody;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageResponseBody;
@@ -90,7 +91,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
String protocolType = sendMessageRequestHeader.getProtocolType();
- ProtocolAdaptor httpCommandProtocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
+ ProtocolAdaptor httpCommandProtocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(asyncContext.getRequest());
//validate event
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index 699325f407..bdae55e770 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -21,16 +21,14 @@
import io.cloudevents.core.builder.CloudEventBuilder;
import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.api.RequestReplyCallback;
-import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.IPUtil;
-import org.apache.eventmesh.common.LiteMessage;
+import org.apache.eventmesh.common.ProtocolTransportObject;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageRequestBody;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageResponseBody;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
-import org.apache.eventmesh.common.protocol.http.header.message.SendMessageRequestHeader;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageResponseHeader;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
@@ -44,7 +42,6 @@
import org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
-import org.apache.eventmesh.runtime.util.OMSUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.commons.lang3.StringUtils;
@@ -84,10 +81,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext
EventMeshConstants.PROTOCOL_HTTP,
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
- SendMessageRequestHeader sendMessageRequestHeader = (SendMessageRequestHeader) asyncContext.getRequest().getHeader();
-
- String protocolType = sendMessageRequestHeader.getProtocolType();
- ProtocolAdaptor httpCommandProtocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
+ ProtocolAdaptor httpCommandProtocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor("cloudevents");
CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(asyncContext.getRequest());
SendMessageResponseHeader sendMessageResponseHeader =
@@ -106,45 +100,58 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext
|| StringUtils.isBlank(event.getType())
|| StringUtils.isBlank(event.getSubject())) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageResponseHeader,
- SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
+ sendMessageResponseHeader,
+ SendMessageResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
- String idc = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.IDC)).toString();
- String pid = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PID)).toString();
- String sys = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS)).toString();
+ String idc = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.IDC))
+ .toString();
+ String pid = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PID))
+ .toString();
+ String sys = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS))
+ .toString();
//validate event-extension
if (StringUtils.isBlank(idc)
- || StringUtils.isBlank(pid)
- || !StringUtils.isNumeric(pid)
- || StringUtils.isBlank(sys)) {
+ || StringUtils.isBlank(pid)
+ || !StringUtils.isNumeric(pid)
+ || StringUtils.isBlank(sys)) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageResponseHeader,
- SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
- EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
+ sendMessageResponseHeader,
+ SendMessageResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
- String bizNo = Objects.requireNonNull(event.getExtension(SendMessageRequestBody.BIZSEQNO)).toString();
- String uniqueId = Objects.requireNonNull(event.getExtension(SendMessageRequestBody.UNIQUEID)).toString();
- String producerGroup = Objects.requireNonNull(event.getExtension(SendMessageRequestBody.PRODUCERGROUP)).toString();
+ String bizNo =
+ Objects.requireNonNull(event.getExtension(SendMessageRequestBody.BIZSEQNO)).toString();
+ String uniqueId =
+ Objects.requireNonNull(event.getExtension(SendMessageRequestBody.UNIQUEID)).toString();
+ String producerGroup =
+ Objects.requireNonNull(event.getExtension(SendMessageRequestBody.PRODUCERGROUP))
+ .toString();
String topic = event.getSubject();
- String ttl = Objects.requireNonNull(event.getExtension(SendMessageRequestBody.TTL)).toString();
+ String ttl =
+ Objects.requireNonNull(event.getExtension(SendMessageRequestBody.TTL)).toString();
//validate body
if (StringUtils.isBlank(bizNo)
- || StringUtils.isBlank(uniqueId)
- || StringUtils.isBlank(producerGroup)
- || StringUtils.isBlank(topic)
- || event.getData() != null
- || StringUtils.isBlank(ttl)) {
+ || StringUtils.isBlank(uniqueId)
+ || StringUtils.isBlank(producerGroup)
+ || StringUtils.isBlank(topic)
+ || event.getData() != null
+ || StringUtils.isBlank(ttl)) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageResponseHeader,
- SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
+ sendMessageResponseHeader,
+ SendMessageResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
@@ -257,10 +264,11 @@ public void onSuccess(CloudEvent event) {
event = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.RSP_EVENTMESH2C_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
- .withExtension(EventMeshConstants.RSP_MQ2EVENTMESH_TIMESTAMP,
- String.valueOf(System.currentTimeMillis()))
- .build();
- final String rtnMsg = new String(event.getData().toBytes(), EventMeshConstants.DEFAULT_CHARSET);
+ .withExtension(EventMeshConstants.RSP_MQ2EVENTMESH_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()))
+ .build();
+ final String rtnMsg = new String(event.getData().toBytes(),
+ EventMeshConstants.DEFAULT_CHARSET);
HttpCommand succ = asyncContext.getRequest().createHttpCommandResponse(
sendMessageResponseHeader,
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
index d91b462b64..32ddee17b8 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
@@ -72,18 +72,20 @@ public SubscribeProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
public void processRequest(ChannelHandlerContext ctx, AsyncContext asyncContext)
throws Exception {
HttpCommand responseEventMeshCommand;
+ final HttpCommand request = asyncContext.getRequest();
+ final Integer requestCode = Integer.valueOf(asyncContext.getRequest().getRequestCode());
+
httpLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}",
- RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
+ RequestCode.get(requestCode),
EventMeshConstants.PROTOCOL_HTTP,
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
- SubscribeRequestHeader subscribeRequestHeader =
- (SubscribeRequestHeader) asyncContext.getRequest().getHeader();
- SubscribeRequestBody subscribeRequestBody =
- (SubscribeRequestBody) asyncContext.getRequest().getBody();
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress()
+ );
+ SubscribeRequestHeader subscribeRequestHeader = (SubscribeRequestHeader) request.getHeader();
+ SubscribeRequestBody subscribeRequestBody = (SubscribeRequestBody) request.getBody();
SubscribeResponseHeader subscribeResponseHeader =
SubscribeResponseHeader
- .buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()),
+ .buildHeader(requestCode,
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
IPUtil.getLocalAddress(),
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
@@ -94,7 +96,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext
|| StringUtils.isBlank(subscribeRequestHeader.getPid())
|| !StringUtils.isNumeric(subscribeRequestHeader.getPid())
|| StringUtils.isBlank(subscribeRequestHeader.getSys())) {
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ responseEventMeshCommand = request.createHttpCommandResponse(
subscribeResponseHeader,
SubscribeResponseBody
.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
@@ -108,7 +110,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext
|| CollectionUtils.isEmpty(subscribeRequestBody.getTopics())
|| StringUtils.isBlank(subscribeRequestBody.getConsumerGroup())) {
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ responseEventMeshCommand = request.createHttpCommandResponse(
subscribeResponseHeader,
SubscribeResponseBody
.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
@@ -124,7 +126,6 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext
String user = subscribeRequestHeader.getUsername();
String pass = subscribeRequestHeader.getPasswd();
String subsystem = subscribeRequestHeader.getSys();
- int requestCode = Integer.valueOf(subscribeRequestHeader.getCode());
for (SubscriptionItem item : subTopicList) {
try {
Acl.doAclCheckInHttpReceive(remoteAddr, user, pass, subsystem, item.getTopic(),
@@ -232,16 +233,14 @@ public void onResponse(HttpCommand httpCommand) {
}
eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(
- System.currentTimeMillis()
- - asyncContext.getRequest().getReqTime());
+ System.currentTimeMillis() - request.getReqTime());
} catch (Exception ex) {
// ignore
}
}
};
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- EventMeshRetCode.SUCCESS.getRetCode(), EventMeshRetCode.SUCCESS.getErrMsg());
+ responseEventMeshCommand = request.createHttpCommandResponse(EventMeshRetCode.SUCCESS);
asyncContext.onComplete(responseEventMeshCommand, handler);
} catch (Exception e) {
HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
index df3921dd9b..45b567db0a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
@@ -208,9 +208,8 @@ public void onResponse(HttpCommand httpCommand) {
eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup));
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- EventMeshRetCode.SUCCESS.getRetCode(),
- EventMeshRetCode.SUCCESS.getErrMsg());
+ responseEventMeshCommand =
+ asyncContext.getRequest().createHttpCommandResponse(EventMeshRetCode.SUCCESS);
asyncContext.onComplete(responseEventMeshCommand, handler);
} catch (Exception e) {
@@ -236,9 +235,8 @@ public void onResponse(HttpCommand httpCommand) {
try {
eventMeshHTTPServer.getConsumerManager()
.notifyConsumerManager(consumerGroup, null);
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- EventMeshRetCode.SUCCESS.getRetCode(),
- EventMeshRetCode.SUCCESS.getErrMsg());
+ responseEventMeshCommand =
+ asyncContext.getRequest().createHttpCommandResponse(EventMeshRetCode.SUCCESS);
asyncContext.onComplete(responseEventMeshCommand, handler);
// clean ClientInfo
eventMeshHTTPServer.localClientInfoMapping.keySet()
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index 1559e697dc..5f31355cb8 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -21,6 +21,7 @@
import io.cloudevents.core.builder.CloudEventBuilder;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.IPUtil;
+import org.apache.eventmesh.common.ProtocolTransportObject;
import org.apache.eventmesh.common.RandomStringUtil;
import org.apache.eventmesh.common.exception.JsonException;
import org.apache.eventmesh.common.protocol.SubscriptionType;
@@ -117,11 +118,13 @@ public void tryHTTPRequest() {
try {
String protocolType = Objects.requireNonNull(event.getExtension(Constants.PROTOCOL_TYPE)).toString();
- ProtocolAdaptor protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
+ ProtocolAdaptor protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
- content = (String) protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent());
+ // todo
+ ProtocolTransportObject protocolTransportObject =
+ protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent());
-// content =
+ // content =
// new String(handleMsgContext.getEvent().getData().toBytes(), EventMeshConstants.DEFAULT_CHARSET);
} catch (Exception ex) {
return;
@@ -300,13 +303,11 @@ private void addToWaitingMap(AsyncHTTPPushRequest request) {
waitingRequests
.put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet());
waitingRequests.get(request.handleMsgContext.getConsumerGroup()).add(request);
- return;
}
private void removeWaitingMap(AsyncHTTPPushRequest request) {
if (waitingRequests.containsKey(request.handleMsgContext.getConsumerGroup())) {
waitingRequests.get(request.handleMsgContext.getConsumerGroup()).remove(request);
- return;
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
index 176fe5db9c..4c595e1b77 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
@@ -27,6 +27,7 @@
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.ProtocolTransportObject;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.*;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
@@ -67,7 +68,7 @@ public void run() {
if (pkg.getHeader().getProperties() != null && pkg.getHeader().getProperty(Constants.PROTOCOL_TYPE) != null) {
protocolType = (String) pkg.getHeader().getProperty(Constants.PROTOCOL_TYPE);
}
- ProtocolAdaptor protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
+ ProtocolAdaptor protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
Package msg = new Package();
// EventMeshMessage eventMeshMessage = (EventMeshMessage) pkg.getBody();
@@ -188,16 +189,6 @@ public void onException(OnExceptionContext context) {
Utils.writeAndFlush(msg, startTime, taskExecuteTime, session.getContext(), session);
}
-// @Override
-// public void onException(Throwable e) {
-// session.getSender().getUpstreamBuff().release();
-// session.getSender().failMsgCount.incrementAndGet();
-// messageLogger.error("upstreamMsg mq message error|user={}|callback cost={}, errMsg={}", session.getClient(), String.valueOf
-// (System.currentTimeMillis() - createTime), new Exception(e));
-// msg.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode(), e.toString(), pkg.getHeader().getSeq()));
-// msg.setBody(accessMessage);
-// Utils.writeAndFlush(msg, startTime, taskExecuteTime, session.getContext(), session);
-// }
};
}
}
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractLiteClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractLiteClient.java
index 20302af8e6..df72fac7e7 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractLiteClient.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractLiteClient.java
@@ -17,6 +17,8 @@
package org.apache.eventmesh.client.http;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.eventmesh.client.http.conf.LiteClientConfig;
import org.apache.eventmesh.client.http.ssl.MyX509TrustManager;
import org.apache.eventmesh.client.http.util.HttpLoadBalanceUtils;
@@ -30,10 +32,9 @@
import javax.net.ssl.TrustManager;
import java.security.SecureRandom;
+@Slf4j
public abstract class AbstractLiteClient {
- public Logger logger = LoggerFactory.getLogger(AbstractLiteClient.class);
-
protected LiteClientConfig liteClientConfig;
protected LoadBalanceSelector eventMeshServerSelector;
@@ -51,7 +52,7 @@ public LiteClientConfig getLiteClientConfig() {
}
public void shutdown() throws Exception {
- logger.info("AbstractLiteClient shutdown");
+ log.info("AbstractLiteClient shutdown");
}
public CloseableHttpClient setHttpClient() throws Exception {
@@ -67,7 +68,7 @@ public CloseableHttpClient setHttpClient() throws Exception {
return HttpClients.custom().setSSLContext(sslContext)
.setSSLHostnameVerifier(new DefaultHostnameVerifier()).build();
} catch (Exception e) {
- logger.error("Error in creating HttpClient.", e);
+ log.error("Error in creating HttpClient.", e);
throw e;
}
}
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/RemotingServer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/RemotingServer.java
deleted file mode 100644
index daf49ea336..0000000000
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/RemotingServer.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.client.http;
-
-import org.apache.eventmesh.client.http.consumer.listener.LiteMessageListener;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
-
-public class RemotingServer {
-
- public static final Logger logger = LoggerFactory.getLogger(RemotingServer.class);
-
- public static final AtomicBoolean started = new AtomicBoolean(Boolean.FALSE);
-
- public static final AtomicBoolean inited = new AtomicBoolean(Boolean.FALSE);
-
- private EventLoopGroup bossGroup;
-
- private EventLoopGroup workerGroup;
-
- private DefaultHttpDataFactory defaultHttpDataFactory = new DefaultHttpDataFactory(false);
-
- private ThreadPoolExecutor consumeExecutor;
-
- private LiteMessageListener messageListener;
-
- public RemotingServer() {
- }
-
- public RemotingServer(ThreadPoolExecutor consumeExecutor) {
- this.consumeExecutor = consumeExecutor;
- }
-
- public void setConsumeExecutor(ThreadPoolExecutor consumeExecutor) {
- this.consumeExecutor = consumeExecutor;
- }
-
- // TODO: Let different topics have different listeners
- public void registerMessageListener(LiteMessageListener eventMeshMessageListener) {
- this.messageListener = eventMeshMessageListener;
- }
-
- private EventLoopGroup initBossGroup() {
- bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
- AtomicInteger count = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "endPointBoss-" + count.incrementAndGet());
- t.setDaemon(true);
- return t;
- }
- });
-
- return bossGroup;
- }
-
- private EventLoopGroup initWorkerGroup() {
- workerGroup = new NioEventLoopGroup(2, new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("endpointWorker-")
- .build()
- );
- return workerGroup;
- }
-
- public void init() throws Exception {
- initBossGroup();
- initWorkerGroup();
- inited.compareAndSet(false, true);
- }
-
-}
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
index 373ca8a5c8..2dad1c6da6 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
@@ -48,19 +48,16 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.handler.codec.http.HttpMethod;
+import lombok.extern.slf4j.Slf4j;
+@Slf4j
public class LiteConsumer extends AbstractLiteClient {
- public static final Logger logger = LoggerFactory.getLogger(LiteConsumer.class);
-
private ThreadPoolExecutor consumeExecutor;
protected LiteClientConfig eventMeshClientConfig;
@@ -75,11 +72,10 @@ public class LiteConsumer extends AbstractLiteClient {
);
public LiteConsumer(LiteClientConfig liteClientConfig) {
- super(liteClientConfig);
- this.consumeExecutor =
+ this(liteClientConfig,
ThreadPoolFactory.createThreadPoolExecutor(liteClientConfig.getConsumeThreadCore(),
- liteClientConfig.getConsumeThreadMax(), "EventMesh-client-consume-");
- this.eventMeshClientConfig = liteClientConfig;
+ liteClientConfig.getConsumeThreadMax(), "EventMesh-client-consume-")
+ );
}
public LiteConsumer(LiteClientConfig liteClientConfig,
@@ -94,24 +90,32 @@ public void start() throws Exception {
Preconditions.checkNotNull(eventMeshClientConfig,
"EventMeshClientConfig can't be null");
Preconditions.checkNotNull(consumeExecutor, "consumeExecutor can't be null");
- logger.info("LiteConsumer starting");
+ log.info("LiteConsumer starting");
super.start();
started.compareAndSet(false, true);
- logger.info("LiteConsumer started");
+ log.info("LiteConsumer started");
}
@Override
public void shutdown() throws Exception {
- logger.info("LiteConsumer shutting down");
+ log.info("LiteConsumer shutting down");
super.shutdown();
if (consumeExecutor != null) {
consumeExecutor.shutdown();
}
scheduler.shutdown();
started.compareAndSet(true, false);
- logger.info("LiteConsumer shutdown");
+ log.info("LiteConsumer shutdown");
}
+ /**
+ * When receive message will callback the url.
+ *
+ * @param topicList topic that be subscribed
+ * @param url url will be trigger
+ * @return true if subscribe success
+ * @throws Exception
+ */
public boolean subscribe(List topicList, String url) throws Exception {
subscription.addAll(topicList);
if (!started.get()) {
@@ -128,8 +132,8 @@ public boolean subscribe(List topicList, String url) throws Ex
subRes = HttpUtil.post(httpClient, target, subscribeParam);
}
- if (logger.isDebugEnabled()) {
- logger.debug(
+ if (log.isDebugEnabled()) {
+ log.debug(
"subscribe message by await, targetEventMesh:{}, cost:{}ms, subscribeParam:{}, "
+ "rtn:{}", target, (System.nanoTime() - startTime) / 1000000,
JsonUtils.serialize(subscribeParam), subRes);
@@ -138,6 +142,7 @@ public boolean subscribe(List topicList, String url) throws Ex
EventMeshRetObj ret = JsonUtils.deserialize(subRes, EventMeshRetObj.class);
if (ret.getRetCode() == EventMeshRetCode.SUCCESS.getRetCode()) {
+ // todo: remove return result
return true;
} else {
throw new EventMeshException(ret.getRetCode(), ret.getRetMsg());
@@ -214,8 +219,8 @@ public void run() {
res = HttpUtil.post(httpClient, target, requestParam);
}
- if (logger.isDebugEnabled()) {
- logger.debug(
+ if (log.isDebugEnabled()) {
+ log.debug(
"heartBeat message by await, targetEventMesh:{}, cost:{}ms, rtn:{}",
target, (System.nanoTime() - startTime) / 1000000, res);
}
@@ -226,7 +231,7 @@ public void run() {
throw new EventMeshException(ret.getRetCode(), ret.getRetMsg());
}
} catch (Exception e) {
- logger.error("send heartBeat error", e);
+ log.error("send heartBeat error", e);
}
}
}, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS);
@@ -246,8 +251,8 @@ public boolean unsubscribe(List topicList, String url) throws Exception
unSubRes = HttpUtil.post(httpClient, target, unSubscribeParam);
}
- if (logger.isDebugEnabled()) {
- logger.debug(
+ if (log.isDebugEnabled()) {
+ log.debug(
"unSubscribe message by await, targetEventMesh:{}, cost:{}ms, unSubscribeParam:{}, "
+ "rtn:{}", target, (System.nanoTime() - startTime) / 1000000,
JsonUtils.serialize(unSubscribeParam), unSubRes);
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/http/RequestParam.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/http/RequestParam.java
index 2c7c050d2a..de8c82f1ac 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/http/RequestParam.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/http/RequestParam.java
@@ -24,14 +24,14 @@
import java.util.Map;
import io.netty.handler.codec.http.HttpMethod;
+import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@Slf4j
public class RequestParam {
- public Logger logger = LoggerFactory.getLogger(this.getClass());
-
private Map queryParams;
private HttpMethod httpMethod;
@@ -90,7 +90,7 @@ public String getQueryParams() {
}
}
} catch (UnsupportedEncodingException e) {
- logger.error("get query params failed.", e);
+ log.error("get query params failed.", e);
return "";
}
return stringBuilder.substring(1);
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java
index 9b0b302f7b..0fadad6222 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java
@@ -45,11 +45,11 @@
import com.google.common.base.Preconditions;
import io.netty.handler.codec.http.HttpMethod;
+import lombok.extern.slf4j.Slf4j;
+@Slf4j
public class LiteProducer extends AbstractLiteClient {
- public static final Logger logger = LoggerFactory.getLogger(LiteProducer.class);
-
public LiteProducer(LiteClientConfig liteClientConfig) {
super(liteClientConfig);
}
@@ -65,10 +65,10 @@ public void start() throws Exception {
if (started.get()) {
return;
}
- logger.info("LiteProducer starting");
+ log.info("LiteProducer starting");
super.start();
started.compareAndSet(false, true);
- logger.info("LiteProducer started");
+ log.info("LiteProducer started");
}
@Override
@@ -76,10 +76,10 @@ public void shutdown() throws Exception {
if (!started.get()) {
return;
}
- logger.info("LiteProducer shutting down");
+ log.info("LiteProducer shutting down");
super.shutdown();
started.compareAndSet(true, false);
- logger.info("LiteProducer shutdown");
+ log.info("LiteProducer shutdown");
}
public AtomicBoolean getStarted() {
@@ -110,12 +110,11 @@ public boolean publish(LiteMessage message) throws Exception {
.addBody(SendMessageRequestBody.PRODUCERGROUP, liteClientConfig.getProducerGroup())
.addBody(SendMessageRequestBody.TOPIC, message.getTopic())
.addBody(SendMessageRequestBody.CONTENT, message.getContent())
- .addBody(SendMessageRequestBody.TTL,
- message.getPropKey(Constants.EVENTMESH_MESSAGE_CONST_TTL))
+ .addBody(SendMessageRequestBody.TTL, message.getPropKey(Constants.EVENTMESH_MESSAGE_CONST_TTL))
.addBody(SendMessageRequestBody.BIZSEQNO, message.getBizSeqNo())
.addBody(SendMessageRequestBody.UNIQUEID, message.getUniqueId());
- long startTime = System.currentTimeMillis();
+ long startTime = System.nanoTime();
String target = selectEventMesh();
String res = "";
@@ -123,15 +122,15 @@ public boolean publish(LiteMessage message) throws Exception {
res = HttpUtil.post(httpClient, target, requestParam);
}
- if (logger.isDebugEnabled()) {
- logger.debug("publish async message, targetEventMesh:{}, cost:{}ms, message:{}, rtn:{}",
- target, System.currentTimeMillis() - startTime, message, res);
+ if (log.isDebugEnabled()) {
+ log.debug("publish async message, targetEventMesh:{}, cost:{}ms, message:{}, rtn:{}",
+ target, (System.nanoTime() - startTime) / 1000000, message, res);
}
EventMeshRetObj ret = JsonUtils.deserialize(res, EventMeshRetObj.class);
if (ret.getRetCode() == EventMeshRetCode.SUCCESS.getRetCode()) {
- return Boolean.TRUE;
+ return true;
} else {
throw new EventMeshException(ret.getRetCode(), ret.getRetMsg());
}
@@ -181,8 +180,8 @@ public LiteMessage request(LiteMessage message, long timeout) throws Exception {
res = HttpUtil.post(httpClient, target, requestParam);
}
- if (logger.isDebugEnabled()) {
- logger.debug(
+ if (log.isDebugEnabled()) {
+ log.debug(
"publish sync message by await, targetEventMesh:{}, cost:{}ms, message:{}, rtn:{}",
target, (System.nanoTime() - startTime) / 1000000, message, res);
}
@@ -238,8 +237,8 @@ public void request(LiteMessage message, RRCallback rrCallback, long timeout) th
new RRCallbackResponseHandlerAdapter(message, rrCallback, timeout));
}
- if (logger.isDebugEnabled()) {
- logger.debug("publish sync message by async, target:{}, cost:{}, message:{}", target,
+ if (log.isDebugEnabled()) {
+ log.debug("publish sync message by async, target:{}, cost:{}, message:{}", target,
(System.nanoTime() - startTime) / 1000000, message);
}
}
diff --git a/style/checkStyle.xml b/style/checkStyle.xml
index 9e86d5537d..acea7a70b5 100644
--- a/style/checkStyle.xml
+++ b/style/checkStyle.xml
@@ -54,7 +54,7 @@
-
+