From 90e7864b6f8bc8d8f9a69d8bd46254e6cb150efa Mon Sep 17 00:00:00 2001 From: ruanwenjun Date: Mon, 15 Nov 2021 21:07:31 +0800 Subject: [PATCH] Remove some unused code in sdk module --- .../eventmesh/common/EventMeshException.java | 7 +- eventmesh-sdk-java/build.gradle | 6 + .../eventmesh/client/http/RemotingServer.java | 358 +----------------- .../client/http/consumer/LiteConsumer.java | 82 ++-- .../client/http/http/RequestParam.java | 4 +- .../client/http/producer/LiteProducer.java | 33 +- 6 files changed, 73 insertions(+), 417 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/EventMeshException.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/EventMeshException.java index bffa57e780..620bdf9fc5 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/EventMeshException.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/EventMeshException.java @@ -34,13 +34,12 @@ public EventMeshException(Throwable cause) { super(cause); } - public EventMeshException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + public EventMeshException(String message, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } public EventMeshException(Integer errCode, String errMsg) { - super((new StringBuilder()).append(errCode) - .append("|") - .append(errMsg).toString()); + super(String.format("errorCode: %s, errorMessage: %s", errCode, errMsg)); } } diff --git a/eventmesh-sdk-java/build.gradle b/eventmesh-sdk-java/build.gradle index 0db0df4b2b..f62840dfe5 100644 --- a/eventmesh-sdk-java/build.gradle +++ b/eventmesh-sdk-java/build.gradle @@ -38,4 +38,10 @@ dependencies { testImplementation "io.netty:netty-all" testImplementation "org.apache.httpcomponents:httpclient" + compileOnly 'org.projectlombok:lombok:1.18.22' + annotationProcessor 'org.projectlombok:lombok:1.18.22' + + testCompileOnly 'org.projectlombok:lombok:1.18.22' + testAnnotationProcessor 'org.projectlombok:lombok:1.18.22' + } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/RemotingServer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/RemotingServer.java index 3c98d819c0..daf49ea336 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/RemotingServer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/RemotingServer.java @@ -17,78 +17,34 @@ package org.apache.eventmesh.client.http; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import org.apache.eventmesh.client.http.consumer.listener.LiteMessageListener; + import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import io.netty.channel.EventLoopGroup; -import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpHeaderValues; -import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpRequestDecoder; -import io.netty.handler.codec.http.HttpResponseEncoder; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.http.QueryStringDecoder; -import io.netty.handler.codec.http.multipart.Attribute; import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; -import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; -import io.netty.handler.codec.http.multipart.InterfaceHttpData; - -import org.apache.commons.collections4.MapUtils; -import org.apache.commons.lang3.RandomUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.eventmesh.client.http.consumer.HandleResult; -import org.apache.eventmesh.client.http.consumer.context.LiteConsumeContext; -import org.apache.eventmesh.client.http.consumer.listener.LiteMessageListener; -import org.apache.eventmesh.common.Constants; -import org.apache.eventmesh.common.IPUtil; -import org.apache.eventmesh.common.LiteMessage; -import org.apache.eventmesh.common.ThreadUtil; -import org.apache.eventmesh.common.command.HttpCommand; -import org.apache.eventmesh.common.protocol.http.body.Body; -import org.apache.eventmesh.common.protocol.http.body.message.PushMessageRequestBody; -import org.apache.eventmesh.common.protocol.http.common.ClientRetCode; -import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; -import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion; -import org.apache.eventmesh.common.protocol.http.common.RequestCode; -import org.apache.eventmesh.common.protocol.http.header.Header; -import org.apache.eventmesh.common.protocol.http.header.message.PushMessageRequestHeader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class RemotingServer { - public Logger logger = LoggerFactory.getLogger(this.getClass()); + public static final Logger logger = LoggerFactory.getLogger(RemotingServer.class); - public AtomicBoolean started = new AtomicBoolean(Boolean.FALSE); + public static final AtomicBoolean started = new AtomicBoolean(Boolean.FALSE); - public AtomicBoolean inited = new AtomicBoolean(Boolean.FALSE); + public static final AtomicBoolean inited = new AtomicBoolean(Boolean.FALSE); private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; - private int port = RandomUtils.nextInt(1000, 20000); - private DefaultHttpDataFactory defaultHttpDataFactory = new DefaultHttpDataFactory(false); private ThreadPoolExecutor consumeExecutor; @@ -98,23 +54,10 @@ public class RemotingServer { public RemotingServer() { } - public RemotingServer(int port) { - this.port = port; - } - public RemotingServer(ThreadPoolExecutor consumeExecutor) { this.consumeExecutor = consumeExecutor; } - public RemotingServer(int port, ThreadPoolExecutor consumeExecutor) { - this.port = port; - this.consumeExecutor = consumeExecutor; - } - - public void setPort(int port) { - this.port = port; - } - public void setConsumeExecutor(ThreadPoolExecutor consumeExecutor) { this.consumeExecutor = consumeExecutor; } @@ -139,286 +82,19 @@ public Thread newThread(Runnable r) { return bossGroup; } - private EventLoopGroup initWokerGroup() { - workerGroup = new NioEventLoopGroup(2, new ThreadFactory() { - AtomicInteger count = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "endpointWorker-" + count.incrementAndGet()); - return t; - } - }); + private EventLoopGroup initWorkerGroup() { + workerGroup = new NioEventLoopGroup(2, new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("endpointWorker-") + .build() + ); return workerGroup; } - public String getEndpointURL() { - return String.format("http://%s:%s", IPUtil.getLocalAddress(), port); - } - - - class HTTPHandler extends SimpleChannelInboundHandler { - - /** - * Parse request HEADER - * - * @param fullReq - * @return - */ - private Map parseHTTPHeader(HttpRequest fullReq) { - Map headerParam = new HashMap<>(); - for (String key : fullReq.headers().names()) { - if (StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_TYPE.toString(), key) - || StringUtils.equalsIgnoreCase(HttpHeaderNames.ACCEPT_ENCODING.toString(), key) - || StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_LENGTH.toString(), key)) { - continue; - } - headerParam.put(key, fullReq.headers().get(key)); - } - return headerParam; - } - - @Override - protected void channelRead0(final ChannelHandlerContext ctx, HttpRequest httpRequest) throws Exception { - HttpPostRequestDecoder decoder = null; - try { - if (!httpRequest.decoderResult().isSuccess()) { - sendError(ctx, HttpResponseStatus.BAD_REQUEST); - return; - } - - // Protocol version verification - String protocolVersion = StringUtils.deleteWhitespace(httpRequest.headers().get(ProtocolKey.VERSION)); - if (StringUtils.isBlank(protocolVersion) || !ProtocolVersion.contains(protocolVersion)) { - httpRequest.headers().set(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion()); - } - - Map bodyMap = new HashMap<>(); - - if (httpRequest.method() == HttpMethod.GET) { - QueryStringDecoder getDecoder = new QueryStringDecoder(httpRequest.uri()); - for (Map.Entry> entry : getDecoder.parameters().entrySet()) { - bodyMap.put(entry.getKey(), entry.getValue().get(0)); - } - } else if (httpRequest.method() == HttpMethod.POST) { - decoder = new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest); - List parmList = decoder.getBodyHttpDatas(); - for (InterfaceHttpData parm : parmList) { - if (parm.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { - Attribute data = (Attribute) parm; - bodyMap.put(data.getName(), data.getValue()); - } - } - } else { - sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED); - return; - } - - /////////////////////////////////////////////////////////////////Basic inspection//////////////////////////////////////////////////// - String requestCode = - (httpRequest.method() == HttpMethod.POST) ? StringUtils.deleteWhitespace(httpRequest.headers().get(ProtocolKey.REQUEST_CODE)) - : MapUtils.getString(bodyMap, StringUtils.lowerCase(ProtocolKey.REQUEST_CODE), ""); - - final HttpCommand requestCommand = new HttpCommand( - httpRequest.method().name(), - httpRequest.protocolVersion().protocolName(), requestCode); - - HttpCommand responseCommand; - - // Verify requestCode - if (StringUtils.isBlank(requestCode) - || !StringUtils.isNumeric(requestCode) - || (!String.valueOf(RequestCode.HTTP_PUSH_CLIENT_ASYNC.getRequestCode()).equals(requestCode) - && !String.valueOf(RequestCode.HTTP_PUSH_CLIENT_SYNC.getRequestCode()).equals(requestCode))) { - logger.error("receive invalid requestCode, {}", requestCode); - responseCommand = requestCommand.createHttpCommandResponse(ClientRetCode.OK.getRetCode(), ClientRetCode.OK.getErrMsg()); - sendResponse(ctx, responseCommand.httpResponse()); - return; - } - - requestCommand.setHeader(Header.buildHeader(requestCode, parseHTTPHeader(httpRequest))); - requestCommand.setBody(Body.buildBody(requestCode, bodyMap)); - - if (logger.isDebugEnabled()) { - logger.debug("{}", requestCommand); - } - - PushMessageRequestHeader pushMessageRequestHeader = (PushMessageRequestHeader) requestCommand.header; - PushMessageRequestBody pushMessageRequestBody = (PushMessageRequestBody) requestCommand.body; - - String topic = pushMessageRequestBody.getTopic(); - - // Check if there is a listener for the TOPIC -// if (!listenerTable.containsKey(topic)) { -// logger.error("no listenning for this topic, {}", topic); -// responseCommand = requestCommand.createHttpCommandResponse(ClientRetCode.NOLISTEN.getRetCode(), ClientRetCode.NOLISTEN.getErrMsg()); -// sendResponse(ctx, responseCommand.httpResponse()); -// return; -// } - - final LiteConsumeContext eventMeshConsumeContext = new LiteConsumeContext(pushMessageRequestHeader.getEventMeshIp(), - pushMessageRequestHeader.getEventMeshEnv(), pushMessageRequestHeader.getEventMeshIdc(), - pushMessageRequestHeader.getEventMeshCluster()); - - final LiteMessage liteMessage = new LiteMessage(pushMessageRequestBody.getBizSeqNo(), pushMessageRequestBody.getUniqueId(), - topic, pushMessageRequestBody.getContent()); - - for (Map.Entry entry : pushMessageRequestBody.getExtFields().entrySet()) { - liteMessage.addProp(entry.getKey(), entry.getValue()); - } - - // Transfer to the consumer thread pool - consumeExecutor.execute(new Runnable() { - @Override - public void run() { - try { - if (messageListener.reject()) { - HttpCommand responseCommand = requestCommand.createHttpCommandResponse(handleResult2ClientRetCode(HandleResult.RETRY).getRetCode(), handleResult2ClientRetCode(HandleResult.RETRY).getErrMsg()); - sendResponse(ctx, responseCommand.httpResponse()); - return; - } - - HandleResult handleResult = messageListener.handle(liteMessage, eventMeshConsumeContext); - - if (logger.isDebugEnabled()) { - logger.info("bizSeqNo:{}, topic:{}, handleResult:{}", liteMessage.getBizSeqNo(), liteMessage.getTopic(), handleResult); - } - - HttpCommand responseCommand = requestCommand.createHttpCommandResponse(handleResult2ClientRetCode(handleResult).getRetCode(), handleResult2ClientRetCode(handleResult).getErrMsg()); - sendResponse(ctx, responseCommand.httpResponse()); - } catch (Exception e) { - logger.error("process error", e); - } - } - }); - } catch (Exception ex) { - logger.error("HTTPHandler.channelRead0 err", ex); - } finally { - try { - decoder.destroy(); - } catch (Exception e) { - } - } - } - - public ClientRetCode handleResult2ClientRetCode(HandleResult handleResult) { - if (handleResult == HandleResult.OK) { - return ClientRetCode.OK; - } else if (handleResult == HandleResult.FAIL) { - return ClientRetCode.FAIL; - } else if (handleResult == HandleResult.NOLISTEN) { - return ClientRetCode.NOLISTEN; - } else if (handleResult == HandleResult.RETRY) { - return ClientRetCode.RETRY; - } else { - return ClientRetCode.OK; - } - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - super.channelReadComplete(ctx); - ctx.flush(); // 4 - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - if (null != cause) cause.printStackTrace(); - if (null != ctx) ctx.close(); - } - - /** - * Send default error page - * - * @param ctx - * @param status - */ - private void sendError(ChannelHandlerContext ctx, - HttpResponseStatus status) { - FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, - status); - response.headers().add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN + - "; charset=" + Constants.DEFAULT_CHARSET); - response.headers().add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); - response.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); - ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); - } - - - /** - * Send response - * - * @param ctx - * @param response - */ - private void sendResponse(ChannelHandlerContext ctx, - DefaultFullHttpResponse response) { - ctx.writeAndFlush(response).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture f) throws Exception { - if (!f.isSuccess()) { - logger.warn("send response to [{}] fail, will close this channel", IPUtil.parseChannelRemoteAddr(f.channel())); - f.channel().close(); - return; - } - } - }); - } - - public void shutdown() throws Exception { - if (bossGroup != null) { - bossGroup.shutdownGracefully(); - } - - ThreadUtil.randomSleep(30); - - if (workerGroup != null) { - workerGroup.shutdownGracefully(); - } - - started.compareAndSet(true, false); - inited.compareAndSet(true, false); - } - } - public void init() throws Exception { initBossGroup(); - initWokerGroup(); + initWorkerGroup(); inited.compareAndSet(false, true); } - public void start() throws Exception { - Runnable r = new Runnable() { - @Override - public void run() { - ServerBootstrap b = new ServerBootstrap(); - b.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) - throws Exception { - ch.pipeline() - .addLast(new HttpRequestDecoder(), - new HttpResponseEncoder(), - new HttpObjectAggregator(Integer.MAX_VALUE), - new HTTPHandler()); // 4 - } - }).childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE); - try { - logger.info("EventMesh Client[{}] Started......", port); - ChannelFuture future = b.bind(port).sync(); - future.channel().closeFuture().sync(); - started.compareAndSet(false, true); - } catch (Exception e) { - bossGroup.shutdownGracefully(); - workerGroup.shutdownGracefully(); - } - } - }; - - - Thread t = new Thread(r, "eventMesh-client-remoting-server"); - t.start(); - } } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java index 7915d40245..373ca8a5c8 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java @@ -19,13 +19,10 @@ import org.apache.eventmesh.client.http.AbstractLiteClient; import org.apache.eventmesh.client.http.EventMeshRetObj; -import org.apache.eventmesh.client.http.RemotingServer; import org.apache.eventmesh.client.http.conf.LiteClientConfig; -import org.apache.eventmesh.client.http.consumer.listener.LiteMessageListener; import org.apache.eventmesh.client.http.http.HttpUtil; import org.apache.eventmesh.client.http.http.RequestParam; import org.apache.eventmesh.client.tcp.common.EventMeshCommon; -import org.apache.eventmesh.client.tcp.common.EventMeshThreadFactoryImpl; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.EventMeshException; import org.apache.eventmesh.common.ThreadPoolFactory; @@ -44,7 +41,6 @@ import java.util.ArrayList; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -57,34 +53,33 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.handler.codec.http.HttpMethod; public class LiteConsumer extends AbstractLiteClient { - public Logger logger = LoggerFactory.getLogger(LiteConsumer.class); - - private RemotingServer remotingServer; + public static final Logger logger = LoggerFactory.getLogger(LiteConsumer.class); private ThreadPoolExecutor consumeExecutor; protected LiteClientConfig eventMeshClientConfig; - private List subscription = Lists.newArrayList(); + private static final List subscription = Lists.newArrayList(); - private LiteMessageListener messageListener; + private static final AtomicBoolean started = new AtomicBoolean(Boolean.FALSE); - protected final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(4, - new EventMeshThreadFactoryImpl("TCPClientScheduler", true)); + protected final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor( + Runtime.getRuntime().availableProcessors(), + new ThreadFactoryBuilder().setNameFormat("TCPClientScheduler").setDaemon(true).build() + ); - public LiteConsumer(LiteClientConfig liteClientConfig) throws Exception { + public LiteConsumer(LiteClientConfig liteClientConfig) { super(liteClientConfig); this.consumeExecutor = ThreadPoolFactory.createThreadPoolExecutor(liteClientConfig.getConsumeThreadCore(), - liteClientConfig.getConsumeThreadMax(), "eventMesh-client-consume-"); + liteClientConfig.getConsumeThreadMax(), "EventMesh-client-consume-"); this.eventMeshClientConfig = liteClientConfig; -// this.remotingServer = new RemotingServer(10106, consumeExecutor); -// this.remotingServer.init(); } public LiteConsumer(LiteClientConfig liteClientConfig, @@ -92,22 +87,17 @@ public LiteConsumer(LiteClientConfig liteClientConfig, super(liteClientConfig); this.consumeExecutor = customExecutor; this.eventMeshClientConfig = liteClientConfig; -// this.remotingServer = new RemotingServer(this.consumeExecutor); } - private final AtomicBoolean started = new AtomicBoolean(Boolean.FALSE); - @Override public void start() throws Exception { - Preconditions - .checkState(eventMeshClientConfig != null, "eventMeshClientConfig can't be null"); - Preconditions.checkState(consumeExecutor != null, "consumeExecutor can't be null"); -// Preconditions.checkState(messageListener != null, "messageListener can't be null"); + Preconditions.checkNotNull(eventMeshClientConfig, + "EventMeshClientConfig can't be null"); + Preconditions.checkNotNull(consumeExecutor, "consumeExecutor can't be null"); logger.info("LiteConsumer starting"); super.start(); started.compareAndSet(false, true); logger.info("LiteConsumer started"); -// this.remotingServer.start(); } @Override @@ -130,7 +120,7 @@ public boolean subscribe(List topicList, String url) throws Ex RequestParam subscribeParam = generateSubscribeRequestParam(topicList, url); - long startTime = System.currentTimeMillis(); + long startTime = System.nanoTime(); String target = selectEventMesh(); String subRes = ""; @@ -141,14 +131,14 @@ public boolean subscribe(List topicList, String url) throws Ex if (logger.isDebugEnabled()) { logger.debug( "subscribe message by await, targetEventMesh:{}, cost:{}ms, subscribeParam:{}, " - + "rtn:{}", target, System.currentTimeMillis() - startTime, + + "rtn:{}", target, (System.nanoTime() - startTime) / 1000000, JsonUtils.serialize(subscribeParam), subRes); } EventMeshRetObj ret = JsonUtils.deserialize(subRes, EventMeshRetObj.class); if (ret.getRetCode() == EventMeshRetCode.SUCCESS.getRetCode()) { - return Boolean.TRUE; + return true; } else { throw new EventMeshException(ret.getRetCode(), ret.getRetMsg()); } @@ -157,13 +147,9 @@ public boolean subscribe(List topicList, String url) throws Ex private RequestParam generateSubscribeRequestParam(List topicList, String url) { -// final LiteMessage liteMessage = new LiteMessage(); -// liteMessage.setBizSeqNo(RandomStringUtils.randomNumeric(30)) -// .setContent("subscribe message") -// .setUniqueId(RandomStringUtils.randomNumeric(30)); RequestParam requestParam = new RequestParam(HttpMethod.POST); - requestParam.addHeader(ProtocolKey.REQUEST_CODE, - String.valueOf(RequestCode.SUBSCRIBE.getRequestCode())) + requestParam + .addHeader(ProtocolKey.REQUEST_CODE, RequestCode.SUBSCRIBE.getRequestCode()) .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshClientConfig.getEnv()) .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshClientConfig.getIdc()) .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshClientConfig.getIp()) @@ -191,8 +177,8 @@ private RequestParam generateHeartBeatRequestParam(List topics } RequestParam requestParam = new RequestParam(HttpMethod.POST); - requestParam.addHeader(ProtocolKey.REQUEST_CODE, - String.valueOf(RequestCode.HEARTBEAT.getRequestCode())) + requestParam + .addHeader(ProtocolKey.REQUEST_CODE, RequestCode.HEARTBEAT.getRequestCode()) .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshClientConfig.getEnv()) .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshClientConfig.getIdc()) .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshClientConfig.getIp()) @@ -220,7 +206,7 @@ public void run() { } RequestParam requestParam = generateHeartBeatRequestParam(topicList, url); - long startTime = System.currentTimeMillis(); + long startTime = System.nanoTime(); String target = selectEventMesh(); String res = ""; @@ -231,7 +217,7 @@ public void run() { if (logger.isDebugEnabled()) { logger.debug( "heartBeat message by await, targetEventMesh:{}, cost:{}ms, rtn:{}", - target, System.currentTimeMillis() - startTime, res); + target, (System.nanoTime() - startTime) / 1000000, res); } EventMeshRetObj ret = JsonUtils.deserialize(res, EventMeshRetObj.class); @@ -248,17 +234,11 @@ public void run() { public boolean unsubscribe(List topicList, String url) throws Exception { Set unSub = new HashSet<>(topicList); - Iterator itr = subscription.iterator(); - while (itr.hasNext()) { - SubscriptionItem item = itr.next(); - if (unSub.contains(item.getTopic())) { - itr.remove(); - } - } + subscription.removeIf(item -> unSub.contains(item.getTopic())); RequestParam unSubscribeParam = generateUnSubscribeRequestParam(topicList, url); - long startTime = System.currentTimeMillis(); + long startTime = System.nanoTime(); String target = selectEventMesh(); String unSubRes = ""; @@ -269,14 +249,14 @@ public boolean unsubscribe(List topicList, String url) throws Exception if (logger.isDebugEnabled()) { logger.debug( "unSubscribe message by await, targetEventMesh:{}, cost:{}ms, unSubscribeParam:{}, " - + "rtn:{}", target, System.currentTimeMillis() - startTime, + + "rtn:{}", target, (System.nanoTime() - startTime) / 1000000, JsonUtils.serialize(unSubscribeParam), unSubRes); } EventMeshRetObj ret = JsonUtils.deserialize(unSubRes, EventMeshRetObj.class); if (ret.getRetCode() == EventMeshRetCode.SUCCESS.getRetCode()) { - return Boolean.TRUE; + return true; } else { throw new EventMeshException(ret.getRetCode(), ret.getRetMsg()); } @@ -284,8 +264,8 @@ public boolean unsubscribe(List topicList, String url) throws Exception private RequestParam generateUnSubscribeRequestParam(List topicList, String url) { RequestParam requestParam = new RequestParam(HttpMethod.POST); - requestParam.addHeader(ProtocolKey.REQUEST_CODE, - String.valueOf(RequestCode.UNSUBSCRIBE.getRequestCode())) + requestParam + .addHeader(ProtocolKey.REQUEST_CODE, RequestCode.UNSUBSCRIBE.getRequestCode()) .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshClientConfig.getEnv()) .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshClientConfig.getIdc()) .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshClientConfig.getIp()) @@ -302,12 +282,6 @@ private RequestParam generateUnSubscribeRequestParam(List topicList, Str return requestParam; } - public void registerMessageListener(LiteMessageListener messageListener) - throws EventMeshException { - this.messageListener = messageListener; - remotingServer.registerMessageListener(this.messageListener); - } - public String selectEventMesh() { if (liteClientConfig.isUseTls()) { return Constants.HTTPS_PROTOCOL_PREFIX + eventMeshServerSelector.select(); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/http/RequestParam.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/http/RequestParam.java index 4d23222352..2c7c050d2a 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/http/RequestParam.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/http/RequestParam.java @@ -113,11 +113,11 @@ public RequestParam addQueryParam(String key, String value) { return this; } - public RequestParam addHeader(String key, String value) { + public RequestParam addHeader(String key, Object value) { if (headers == null) { headers = new HashMap<>(); } - headers.put(key, value); + headers.put(key, value.toString()); return this; } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java index 376e6e6de1..9b0b302f7b 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java @@ -48,19 +48,20 @@ public class LiteProducer extends AbstractLiteClient { - public Logger logger = LoggerFactory.getLogger(LiteProducer.class); + public static final Logger logger = LoggerFactory.getLogger(LiteProducer.class); public LiteProducer(LiteClientConfig liteClientConfig) { super(liteClientConfig); } - private AtomicBoolean started = new AtomicBoolean(Boolean.FALSE); + private static final AtomicBoolean started = new AtomicBoolean(Boolean.FALSE); @Override public void start() throws Exception { - Preconditions.checkState(liteClientConfig != null, "liteClientConfig can't be null"); - Preconditions.checkState(liteClientConfig.getLiteEventMeshAddr() != null, - "liteClientConfig.liteServerAddr can't be null"); + Preconditions.checkNotNull(liteClientConfig, "liteClientConfig can't be null"); + Preconditions.checkNotNull( + liteClientConfig.getLiteEventMeshAddr(), "liteClientConfig.liteServerAddr can't be null" + ); if (started.get()) { return; } @@ -94,8 +95,8 @@ public boolean publish(LiteMessage message) throws Exception { Preconditions.checkState(StringUtils.isNotBlank(message.getContent()), "eventMeshMessage[content] invalid"); RequestParam requestParam = new RequestParam(HttpMethod.POST); - requestParam.addHeader(ProtocolKey.REQUEST_CODE, - String.valueOf(RequestCode.MSG_SEND_ASYNC.getRequestCode())) + requestParam + .addHeader(ProtocolKey.REQUEST_CODE, RequestCode.MSG_SEND_ASYNC.getRequestCode()) .addHeader(ProtocolKey.ClientInstanceKey.ENV, liteClientConfig.getEnv()) .addHeader(ProtocolKey.ClientInstanceKey.IDC, liteClientConfig.getIdc()) .addHeader(ProtocolKey.ClientInstanceKey.IP, liteClientConfig.getIp()) @@ -153,8 +154,8 @@ public LiteMessage request(LiteMessage message, long timeout) throws Exception { Preconditions.checkState(StringUtils.isNotBlank(message.getContent()), "eventMeshMessage[content] invalid"); RequestParam requestParam = new RequestParam(HttpMethod.POST); - requestParam.addHeader(ProtocolKey.REQUEST_CODE, - String.valueOf(RequestCode.MSG_SEND_SYNC.getRequestCode())) + requestParam + .addHeader(ProtocolKey.REQUEST_CODE, RequestCode.MSG_SEND_SYNC.getRequestCode()) .addHeader(ProtocolKey.ClientInstanceKey.ENV, liteClientConfig.getEnv()) .addHeader(ProtocolKey.ClientInstanceKey.IDC, liteClientConfig.getIdc()) .addHeader(ProtocolKey.ClientInstanceKey.IP, liteClientConfig.getIp()) @@ -172,7 +173,7 @@ public LiteMessage request(LiteMessage message, long timeout) throws Exception { .addBody(SendMessageRequestBody.BIZSEQNO, message.getBizSeqNo()) .addBody(SendMessageRequestBody.UNIQUEID, message.getUniqueId()); - long startTime = System.currentTimeMillis(); + long startTime = System.nanoTime(); String target = selectEventMesh(); String res = ""; @@ -183,7 +184,7 @@ public LiteMessage request(LiteMessage message, long timeout) throws Exception { if (logger.isDebugEnabled()) { logger.debug( "publish sync message by await, targetEventMesh:{}, cost:{}ms, message:{}, rtn:{}", - target, System.currentTimeMillis() - startTime, message, res); + target, (System.nanoTime() - startTime) / 1000000, message, res); } EventMeshRetObj ret = JsonUtils.deserialize(res, EventMeshRetObj.class); @@ -195,7 +196,7 @@ public LiteMessage request(LiteMessage message, long timeout) throws Exception { .setTopic(replyMessage.topic); return eventMeshMessage; } - + // todo: return msg return null; } @@ -210,8 +211,8 @@ public void request(LiteMessage message, RRCallback rrCallback, long timeout) th Preconditions.checkState(ObjectUtils.allNotNull(rrCallback), "rrCallback invalid"); RequestParam requestParam = new RequestParam(HttpMethod.POST); - requestParam.addHeader(ProtocolKey.REQUEST_CODE, - String.valueOf(RequestCode.MSG_SEND_SYNC.getRequestCode())) + requestParam + .addHeader(ProtocolKey.REQUEST_CODE, RequestCode.MSG_SEND_SYNC.getRequestCode()) .addHeader(ProtocolKey.ClientInstanceKey.ENV, liteClientConfig.getEnv()) .addHeader(ProtocolKey.ClientInstanceKey.IDC, liteClientConfig.getIdc()) .addHeader(ProtocolKey.ClientInstanceKey.IP, liteClientConfig.getIp()) @@ -229,7 +230,7 @@ public void request(LiteMessage message, RRCallback rrCallback, long timeout) th .addBody(SendMessageRequestBody.BIZSEQNO, message.getBizSeqNo()) .addBody(SendMessageRequestBody.UNIQUEID, message.getUniqueId()); - long startTime = System.currentTimeMillis(); + long startTime = System.nanoTime(); String target = selectEventMesh(); try (CloseableHttpClient httpClient = setHttpClient()) { @@ -239,7 +240,7 @@ public void request(LiteMessage message, RRCallback rrCallback, long timeout) th if (logger.isDebugEnabled()) { logger.debug("publish sync message by async, target:{}, cost:{}, message:{}", target, - System.currentTimeMillis() - startTime, message); + (System.nanoTime() - startTime) / 1000000, message); } } }