diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/http/SendMessageRequestProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/http/SendMessageRequestProtocolResolver.java index 9abbdaf7ce..3c5a0b193c 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/http/SendMessageRequestProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/http/SendMessageRequestProtocolResolver.java @@ -17,10 +17,6 @@ package org.apache.eventmesh.protocol.meshmessage.resolver.http; -import io.cloudevents.CloudEvent; -import io.cloudevents.SpecVersion; -import io.cloudevents.core.builder.CloudEventBuilder; -import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.protocol.http.body.Body; import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2RequestBody; import org.apache.eventmesh.common.protocol.http.body.message.SendMessageRequestBody; @@ -30,8 +26,14 @@ import org.apache.eventmesh.common.protocol.http.header.message.SendMessageRequestHeader; import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; +import org.apache.commons.lang3.StringUtils; + import java.nio.charset.StandardCharsets; +import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.builder.CloudEventBuilder; + public class SendMessageRequestProtocolResolver { public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHandleException { @@ -62,49 +64,51 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan cloudEventBuilder = CloudEventBuilder.v1(); event = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo()) - .withSubject(sendMessageRequestBody.getTopic()) - .withData(content.getBytes(StandardCharsets.UTF_8)) - .withExtension(ProtocolKey.REQUEST_CODE, code) - .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) - .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) - .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) - .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) - .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) - .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) - .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) - .withExtension(ProtocolKey.VERSION, version.getVersion()) - .withExtension(ProtocolKey.LANGUAGE, language) - .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) - .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) - .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) - .withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo()) - .withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP, sendMessageRequestBody.getProducerGroup()) - .withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageRequestBody.getTtl()) - .withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageRequestBody.getTag()) - .build(); + .withSubject(sendMessageRequestBody.getTopic()) + .withData(content.getBytes(StandardCharsets.UTF_8)) + .withExtension(ProtocolKey.REQUEST_CODE, code) + .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) + .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) + .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) + .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) + .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) + .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) + .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) + .withExtension(ProtocolKey.VERSION, version.getVersion()) + .withExtension(ProtocolKey.LANGUAGE, language) + .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) + .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) + .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) + .withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo()) + .withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP, + sendMessageRequestBody.getProducerGroup()) + .withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageRequestBody.getTtl()) + .withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageRequestBody.getTag()) + .build(); } else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) { cloudEventBuilder = CloudEventBuilder.v03(); event = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo()) - .withSubject(sendMessageRequestBody.getTopic()) - .withData(content.getBytes(StandardCharsets.UTF_8)) - .withExtension(ProtocolKey.REQUEST_CODE, code) - .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) - .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) - .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) - .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) - .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) - .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) - .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) - .withExtension(ProtocolKey.VERSION, version.getVersion()) - .withExtension(ProtocolKey.LANGUAGE, language) - .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) - .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) - .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) - .withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo()) - .withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP, sendMessageRequestBody.getProducerGroup()) - .withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageRequestBody.getTtl()) - .withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageRequestBody.getTag()) - .build(); + .withSubject(sendMessageRequestBody.getTopic()) + .withData(content.getBytes(StandardCharsets.UTF_8)) + .withExtension(ProtocolKey.REQUEST_CODE, code) + .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) + .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) + .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) + .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) + .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) + .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) + .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) + .withExtension(ProtocolKey.VERSION, version.getVersion()) + .withExtension(ProtocolKey.LANGUAGE, language) + .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) + .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) + .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) + .withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo()) + .withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP, + sendMessageRequestBody.getProducerGroup()) + .withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageRequestBody.getTtl()) + .withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageRequestBody.getTag()) + .build(); } return event; } catch (Exception e) { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractHttpClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractHttpClient.java index 057f2805b7..bb415248a4 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractHttpClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractHttpClient.java @@ -88,6 +88,7 @@ private CloseableHttpClient setHttpClient() throws EventMeshException { } protected String selectEventMesh() { + // todo: target endpoint maybe destroy, should remove the bad endpoint if (eventMeshHttpClientConfig.isUseTls()) { return Constants.HTTPS_PROTOCOL_PREFIX + eventMeshServerSelector.select(); } else { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/conf/EventMeshHttpClientConfig.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/conf/EventMeshHttpClientConfig.java index 062b2e1b76..24dfd0fee4 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/conf/EventMeshHttpClientConfig.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/conf/EventMeshHttpClientConfig.java @@ -47,7 +47,8 @@ public class EventMeshHttpClientConfig { @Builder.Default private int consumeThreadMax = 5; - private String env; + @Builder.Default + private String env = ""; @Builder.Default private String consumerGroup = "DefaultConsumerGroup"; @@ -55,18 +56,23 @@ public class EventMeshHttpClientConfig { @Builder.Default private String producerGroup = "DefaultProducerGroup"; - private String idc; + @Builder.Default + private String idc = ""; @Builder.Default private String ip = "127.0.0.1"; - private String pid; + @Builder.Default + private String pid = ""; - private String sys; + @Builder.Default + private String sys = ""; - private String userName; + @Builder.Default + private String userName = ""; - private String password; + @Builder.Default + private String password = ""; @Builder.Default private boolean useTls = false; diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java index 7b3081e468..00c544a261 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java @@ -37,6 +37,8 @@ import org.apache.eventmesh.common.protocol.http.common.RequestCode; import org.apache.eventmesh.common.utils.JsonUtils; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -45,7 +47,6 @@ import java.util.stream.Collectors; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.handler.codec.http.HttpMethod; @@ -56,7 +57,7 @@ public class EventMeshHttpConsumer extends AbstractHttpClient implements AutoClo private final ThreadPoolExecutor consumeExecutor; - private static final List subscription = Lists.newArrayList(); + private static final List SUBSCRIPTIONS = Collections.synchronizedList(new ArrayList<>()); private final ScheduledThreadPoolExecutor scheduler; @@ -100,7 +101,7 @@ public void subscribe(List topicList, String subscribeUrl) thr if (ret.getRetCode() != EventMeshRetCode.SUCCESS.getRetCode()) { throw new EventMeshException(ret.getRetCode(), ret.getRetMsg()); } - subscription.addAll(topicList); + SUBSCRIPTIONS.addAll(topicList); } catch (Exception ex) { throw new EventMeshException(String.format("Subscribe topic error, target:%s", target), ex); } @@ -156,7 +157,7 @@ public void unsubscribe(List topicList, String unSubscribeUrl) throws Ev throw new EventMeshException(ret.getRetCode(), ret.getRetMsg()); } // todo: avoid concurrentModifiedException - subscription.removeIf(item -> topicList.contains(item.getTopic())); + SUBSCRIPTIONS.removeIf(item -> topicList.contains(item.getTopic())); } catch (Exception ex) { throw new EventMeshException(String.format("Unsubscribe topic error, target:%s", target), ex); } @@ -173,15 +174,6 @@ public void close() throws EventMeshException { log.info("LiteConsumer shutdown"); } - private String selectEventMesh() { - // todo: target endpoint maybe destroy, should remove the bad endpoint - if (eventMeshHttpClientConfig.isUseTls()) { - return Constants.HTTPS_PROTOCOL_PREFIX + eventMeshServerSelector.select(); - } else { - return Constants.HTTP_PROTOCOL_PREFIX + eventMeshServerSelector.select(); - } - } - private RequestParam buildCommonRequestParam() { return new RequestParam(HttpMethod.POST) .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshHttpClientConfig.getEnv()) @@ -191,6 +183,7 @@ private RequestParam buildCommonRequestParam() { .addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshHttpClientConfig.getSys()) .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshHttpClientConfig.getUserName()) .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshHttpClientConfig.getPassword()) + // add protocol version? .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion()) .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA) .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT) diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/CloudEventProducer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/CloudEventProducer.java index 9fd83aa101..6d6ff2f63d 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/CloudEventProducer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/CloudEventProducer.java @@ -20,12 +20,15 @@ import com.google.common.base.Preconditions; import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import io.netty.handler.codec.http.HttpMethod; import lombok.extern.slf4j.Slf4j; @Slf4j class CloudEventProducer extends AbstractHttpClient implements EventMeshProtocolProducer { + private static final String PROTOCOL_TYPE = "cloudevents"; + public CloudEventProducer(EventMeshHttpClientConfig eventMeshHttpClientConfig) throws EventMeshException { super(eventMeshHttpClientConfig); } @@ -33,8 +36,9 @@ public CloudEventProducer(EventMeshHttpClientConfig eventMeshHttpClientConfig) t @Override public void publish(CloudEvent cloudEvent) throws EventMeshException { validateCloudEvent(cloudEvent); + CloudEvent enhanceCloudEvent = enhanceCloudEvent(cloudEvent); // todo: Can put to abstract class, all protocol use the same send method? This can be a template method - RequestParam requestParam = buildCommonPostParam(cloudEvent) + RequestParam requestParam = buildCommonPostParam(enhanceCloudEvent) .addHeader(ProtocolKey.REQUEST_CODE, RequestCode.MSG_SEND_ASYNC.getRequestCode()); String target = selectEventMesh(); try { @@ -53,7 +57,8 @@ public void publish(CloudEvent cloudEvent) throws EventMeshException { @Override public CloudEvent request(CloudEvent cloudEvent, long timeout) throws EventMeshException { validateCloudEvent(cloudEvent); - RequestParam requestParam = buildCommonPostParam(cloudEvent) + CloudEvent enhanceCloudEvent = enhanceCloudEvent(cloudEvent); + RequestParam requestParam = buildCommonPostParam(enhanceCloudEvent) .addHeader(ProtocolKey.REQUEST_CODE, RequestCode.MSG_SEND_SYNC.getRequestCode()) .setTimeout(timeout); String target = selectEventMesh(); @@ -71,15 +76,16 @@ public CloudEvent request(CloudEvent cloudEvent, long timeout) throws EventMeshE } @Override - public void request(CloudEvent cloudEvent, RRCallback rrCallback, long timeout) + public void request(final CloudEvent cloudEvent, final RRCallback rrCallback, long timeout) throws EventMeshException { validateCloudEvent(cloudEvent); - RequestParam requestParam = buildCommonPostParam(cloudEvent) + CloudEvent enhanceCloudEvent = enhanceCloudEvent(cloudEvent); + RequestParam requestParam = buildCommonPostParam(enhanceCloudEvent) .addHeader(ProtocolKey.REQUEST_CODE, RequestCode.MSG_SEND_SYNC.getRequestCode()) .setTimeout(timeout); String target = selectEventMesh(); - RRCallbackResponseHandlerAdapter adapter = - new RRCallbackResponseHandlerAdapter<>(cloudEvent, rrCallback, timeout); + RRCallbackResponseHandlerAdapter adapter = new RRCallbackResponseHandlerAdapter<>( + enhanceCloudEvent, rrCallback, timeout); try { HttpUtils.post(httpClient, null, target, requestParam, adapter); } catch (IOException e) { @@ -97,14 +103,27 @@ private RequestParam buildCommonPostParam(CloudEvent cloudEvent) { requestParam .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshHttpClientConfig.getUserName()) .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshHttpClientConfig.getPassword()) - .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion()) .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA) - // todo: add producerGroup to header, set protocol type, protocol version + .addHeader(ProtocolKey.PROTOCOL_TYPE, PROTOCOL_TYPE) + // todo: move producerGroup tp header .addBody(SendMessageRequestBody.PRODUCERGROUP, eventMeshHttpClientConfig.getProducerGroup()) .addBody(SendMessageRequestBody.CONTENT, JsonUtils.serialize(cloudEvent)); return requestParam; } + private CloudEvent enhanceCloudEvent(final CloudEvent cloudEvent) { + return CloudEventBuilder.from(cloudEvent) + .withExtension(ProtocolKey.ClientInstanceKey.ENV, eventMeshHttpClientConfig.getEnv()) + .withExtension(ProtocolKey.ClientInstanceKey.IDC, eventMeshHttpClientConfig.getIdc()) + .withExtension(ProtocolKey.ClientInstanceKey.IP, eventMeshHttpClientConfig.getIp()) + .withExtension(ProtocolKey.ClientInstanceKey.PID, eventMeshHttpClientConfig.getPid()) + .withExtension(ProtocolKey.ClientInstanceKey.SYS, eventMeshHttpClientConfig.getSys()) + .withExtension(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA) + .withExtension(ProtocolKey.PROTOCOL_DESC, cloudEvent.getSpecVersion().name()) + .withExtension(ProtocolKey.PROTOCOL_VERSION, cloudEvent.getSpecVersion().toString()) + .build(); + } + private CloudEvent transformMessage(EventMeshRetObj retObj) { SendMessageResponseBody.ReplyMessage replyMessage = JsonUtils.deserialize(retObj.getRetMsg(), SendMessageResponseBody.ReplyMessage.class); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPClient.java deleted file mode 100644 index fa81cf0faa..0000000000 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPClient.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.client.tcp; - -import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; -import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; -import org.apache.eventmesh.common.exception.EventMeshException; -import org.apache.eventmesh.common.protocol.SubscriptionMode; -import org.apache.eventmesh.common.protocol.SubscriptionType; -import org.apache.eventmesh.common.protocol.tcp.Package; - -import io.cloudevents.CloudEvent; - -/** - * EventMesh Tcp client, it contains all publish/subscribe method. - * todo: Should we only keep EventMeshTcpPubClient/EventMeshTcpSubClient and remove this EventMeshTcpClient? - */ -public interface EventMeshTCPClient { - - // todo: use protocol message instead of Package - Package rr(Package msg, long timeout) throws EventMeshException; - - void asyncRR(Package msg, AsyncRRCallback callback, long timeout) throws EventMeshException; - - Package publish(Package msg, long timeout) throws EventMeshException; - - Package publish(CloudEvent cloudEvent, long timeout) throws EventMeshException; - - void broadcast(CloudEvent cloudEvent, long timeout) throws EventMeshException; - - void broadcast(Package msg, long timeout) throws EventMeshException; - - void init() throws EventMeshException; - - void close() throws EventMeshException; - - void heartbeat() throws EventMeshException; - - void listen() throws EventMeshException; - - void subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) - throws EventMeshException; - - void unsubscribe() throws EventMeshException; - - void registerPubBusiHandler(ReceiveMsgHook handler) throws EventMeshException; - - void registerSubBusiHandler(ReceiveMsgHook handler) throws EventMeshException; -} diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java index f4d9f66a34..821119065e 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java @@ -21,10 +21,14 @@ import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.common.protocol.tcp.Package; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; /** * EventMesh TCP publish client. + *
    + *
  • {@link org.apache.eventmesh.client.tcp.impl.cloudevent.CloudEventTCPPubClient}
  • + *
  • {@link org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPSubClient}
  • + *
  • {@link org.apache.eventmesh.client.tcp.impl.openmessage.OpenMessageTCPPubClient}
  • + *
*/ public interface EventMeshTCPPubClient extends AutoCloseable { @@ -34,18 +38,15 @@ public interface EventMeshTCPPubClient extends AutoCloseable { void reconnect() throws EventMeshException; - Package rr(Package msg, long timeout) throws EventMeshException; + // todo: Hide package method, use ProtocolMessage + Package rr(ProtocolMessage msg, long timeout) throws EventMeshException; - void asyncRR(Package msg, AsyncRRCallback callback, long timeout) throws EventMeshException; - - Package publish(Package msg, long timeout) throws EventMeshException; + void asyncRR(ProtocolMessage msg, AsyncRRCallback callback, long timeout) throws EventMeshException; Package publish(ProtocolMessage cloudEvent, long timeout) throws EventMeshException; void broadcast(ProtocolMessage cloudEvent, long timeout) throws EventMeshException; - void broadcast(Package msg, long timeout) throws EventMeshException; - void registerBusiHandler(ReceiveMsgHook handler) throws EventMeshException; void close() throws EventMeshException; diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java index 924b9ecbe6..f8b6c7fff9 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java @@ -21,12 +21,17 @@ import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.SubscriptionType; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; /** * EventMesh TCP subscribe client. + *
    + *
  • {@link org.apache.eventmesh.client.tcp.impl.cloudevent.CloudEventTCPSubClient}
  • + *
  • {@link org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPSubClient}
  • + *
  • {@link org.apache.eventmesh.client.tcp.impl.openmessage.OpenMessageTCPSubClient}
  • + *
*/ public interface EventMeshTCPSubClient { + void init() throws EventMeshException; void heartbeat() throws EventMeshException; diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java index 7d47b113ba..a0c1ba85bc 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java @@ -113,38 +113,39 @@ public static Package responseToClientAck(Package in) { } public static UserAgent generateSubClient(UserAgent agent) { - UserAgent user = new UserAgent(); - user.setEnv(agent.getEnv()); - user.setHost(agent.getHost()); - user.setPassword(agent.getPassword()); - user.setUsername(agent.getUsername()); - user.setPath(agent.getPath()); - user.setPort(agent.getPort()); - user.setSubsystem(agent.getSubsystem()); - user.setPid(agent.getPid()); - user.setVersion(agent.getVersion()); - user.setIdc(agent.getIdc()); - user.setConsumerGroup(agent.getConsumerGroup()); - user.setProducerGroup(agent.getProducerGroup()); - user.setPurpose(EventMeshCommon.USER_AGENT_PURPOSE_SUB); - return user; + return UserAgent.builder() + .env(agent.getEnv()) + .host(agent.getHost()) + .password(agent.getPassword()) + .username(agent.getUsername()) + .path(agent.getPath()) + .port(agent.getPort()) + .subsystem(agent.getSubsystem()) + .pid(agent.getPid()) + .version(agent.getVersion()) + .idc(agent.getIdc()) + .consumerGroup(agent.getConsumerGroup()) + .producerGroup(agent.getProducerGroup()) + .purpose(EventMeshCommon.USER_AGENT_PURPOSE_SUB) + .build(); } public static UserAgent generatePubClient(UserAgent agent) { - UserAgent user = new UserAgent(); - user.setEnv(agent.getEnv()); - user.setHost(agent.getHost()); - user.setPassword(agent.getPassword()); - user.setUsername(agent.getUsername()); - user.setPath(agent.getPath()); - user.setPort(agent.getPort()); - user.setSubsystem(agent.getSubsystem()); - user.setPid(agent.getPid()); - user.setVersion(agent.getVersion()); - user.setIdc(agent.getIdc()); - user.setProducerGroup(agent.getProducerGroup()); - user.setPurpose(EventMeshCommon.USER_AGENT_PURPOSE_PUB); - return user; + return UserAgent.builder() + .env(agent.getEnv()) + .host(agent.getHost()) + .password(agent.getPassword()) + .username(agent.getUsername()) + .path(agent.getPath()) + .port(agent.getPort()) + .subsystem(agent.getSubsystem()) + .pid(agent.getPid()) + .version(agent.getVersion()) + .idc(agent.getIdc()) + .consumerGroup(agent.getConsumerGroup()) + .producerGroup(agent.getProducerGroup()) + .purpose(EventMeshCommon.USER_AGENT_PURPOSE_PUB) + .build(); } private static Subscription generateSubscription(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshTCPClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshTCPClient.java deleted file mode 100644 index 46c844a5ed..0000000000 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshTCPClient.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.client.tcp.impl; - -import org.apache.eventmesh.client.tcp.EventMeshTCPClient; -import org.apache.eventmesh.client.tcp.EventMeshTCPPubClient; -import org.apache.eventmesh.client.tcp.EventMeshTCPSubClient; -import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; -import org.apache.eventmesh.client.tcp.common.MessageUtils; -import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; -import org.apache.eventmesh.common.exception.EventMeshException; -import org.apache.eventmesh.common.protocol.SubscriptionMode; -import org.apache.eventmesh.common.protocol.SubscriptionType; -import org.apache.eventmesh.common.protocol.tcp.Package; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; - -import io.cloudevents.CloudEvent; -import lombok.ToString; - -@ToString -public class DefaultEventMeshTCPClient implements EventMeshTCPClient { - protected UserAgent agent; - private String accessHost; - private int accessPort; - - private EventMeshTCPPubClient pubClient; - private EventMeshTCPSubClient subClient; - - public DefaultEventMeshTCPClient(String accessHost, int accessPort, UserAgent agent) { - this.accessHost = accessHost; - this.accessPort = accessPort; - this.agent = agent; - - UserAgent subAgent = MessageUtils.generateSubClient(agent); - this.subClient = new EventMeshTCPSubClientImpl(accessHost, accessPort, subAgent); - - UserAgent pubAgent = MessageUtils.generatePubClient(agent); - this.pubClient = new EventMeshTCPPubClientImpl(accessHost, accessPort, pubAgent); - } - - public EventMeshTCPPubClient getPubClient() { - return pubClient; - } - - public void setPubClient(EventMeshTCPPubClient pubClient) { - this.pubClient = pubClient; - } - - public EventMeshTCPSubClient getSubClient() { - return subClient; - } - - public void setSubClient(EventMeshTCPSubClient subClient) { - this.subClient = subClient; - } - - public Package rr(Package msg, long timeout) throws EventMeshException { - return this.pubClient.rr(msg, timeout); - } - - public Package publish(Package msg, long timeout) throws EventMeshException { - return this.pubClient.publish(msg, timeout); - } - - @Override - public Package publish(CloudEvent cloudEvent, long timeout) throws EventMeshException { - return this.pubClient.publish(cloudEvent, timeout); - } - - public void broadcast(Package msg, long timeout) throws EventMeshException { - this.pubClient.broadcast(msg, timeout); - } - - @Override - public void broadcast(CloudEvent cloudEvent, long timeout) throws EventMeshException { - this.pubClient.broadcast(cloudEvent, timeout); - } - - public void init() throws EventMeshException { - this.subClient.init(); - this.pubClient.init(); - } - - public void close() { - this.pubClient.close(); - this.subClient.close(); - } - - public void heartbeat() throws EventMeshException { - this.pubClient.heartbeat(); - this.subClient.heartbeat(); - } - - public void listen() throws EventMeshException { - this.subClient.listen(); - } - - @Override - public void subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) - throws Exception { - this.subClient.subscribe(topic, subscriptionMode, subscriptionType); - } - - @Override - public void unsubscribe() throws EventMeshException { - this.subClient.unsubscribe(); - } - - public void registerSubBusiHandler(ReceiveMsgHook handler) throws EventMeshException { - this.subClient.registerBusiHandler(handler); - } - - @Override - public void asyncRR(Package msg, AsyncRRCallback callback, long timeout) throws EventMeshException { - this.pubClient.asyncRR(msg, callback, timeout); - } - - public void registerPubBusiHandler(ReceiveMsgHook handler) throws EventMeshException { - this.pubClient.registerBusiHandler(handler); - } -} diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/EventMeshTCPPubClientImpl.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/EventMeshTCPPubClientImpl.java deleted file mode 100644 index 922a738e8f..0000000000 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/EventMeshTCPPubClientImpl.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.client.tcp.impl; - -import org.apache.eventmesh.client.tcp.EventMeshTCPPubClient; -import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; -import org.apache.eventmesh.client.tcp.common.EventMeshCommon; -import org.apache.eventmesh.client.tcp.common.MessageUtils; -import org.apache.eventmesh.client.tcp.common.PropertyConst; -import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; -import org.apache.eventmesh.client.tcp.common.RequestContext; -import org.apache.eventmesh.client.tcp.common.TcpClient; -import org.apache.eventmesh.common.exception.EventMeshException; -import org.apache.eventmesh.common.protocol.tcp.Command; -import org.apache.eventmesh.common.protocol.tcp.Package; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import io.cloudevents.CloudEvent; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class EventMeshTCPPubClientImpl extends TcpClient implements EventMeshTCPPubClient { - - private final UserAgent userAgent; - - private ReceiveMsgHook callback; - - private final ConcurrentHashMap callbackConcurrentHashMap = new ConcurrentHashMap<>(); - private ScheduledFuture task; - - public EventMeshTCPPubClientImpl(String accessIp, int port, UserAgent agent) { - super(accessIp, port); - this.userAgent = agent; - } - - public void registerBusiHandler(ReceiveMsgHook handler) { - callback = handler; - } - - public void init() throws Exception { - open(new Handler()); - hello(); - log.info("SimplePubClientImpl|{}|started!", clientNo); - } - - public void reconnect() throws Exception { - super.reconnect(); - hello(); - } - - public void close() throws EventMeshException { - try { - task.cancel(false); - goodbye(); - super.close(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - public void heartbeat() throws Exception { - task = scheduler.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - if (!isActive()) { - EventMeshTCPPubClientImpl.this.reconnect(); - } - Package msg = MessageUtils.heartBeat(); - io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } catch (Exception ignore) { - // ignore - } - } - }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); - } - - private void goodbye() throws Exception { - Package msg = MessageUtils.goodbye(); - this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } - - private void hello() throws Exception { - Package msg = MessageUtils.hello(userAgent); - this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } - - /** - * Send RR message - * - * @param msg - * @param timeout - * @return - * @throws Exception - */ - public Package rr(Package msg, long timeout) throws Exception { - log.info("SimplePubClientImpl|{}|rr|send|type={}|msg={}", clientNo, msg.getHeader().getCommand(), msg); - return io(msg, timeout); - } - - /** - * Asynchronous RR - * - * @param msg - * @param callback - * @param timeout - * @throws Exception - */ - @Override - public void asyncRR(Package msg, AsyncRRCallback callback, long timeout) throws Exception { - super.send(msg); - this.callbackConcurrentHashMap.put((String) RequestContext._key(msg), callback); - - } - - /** - * Publish message - * - * @param msg - * @throws Exception - */ - public Package publish(Package msg, long timeout) throws Exception { - log.info("SimplePubClientImpl|{}|publish|send|type={}|msg={}", clientNo, msg.getHeader().getCommand(), msg); - return io(msg, timeout); - } - - - @Override - public Package publish(CloudEvent cloudEvent, long timeout) throws Exception { - Package msg = MessageUtils.asyncCloudEvent(cloudEvent); - log.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}", - clientNo, msg.getHeader().getCommand(), - msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg); - return io(MessageUtils.asyncCloudEvent(cloudEvent), timeout); - } - - @Override - public void broadcast(CloudEvent cloudEvent, long timeout) throws Exception { - Package msg = MessageUtils.asyncCloudEvent(cloudEvent); - log.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}", - clientNo, msg.getHeader().getCommand(), - msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg); - super.send(msg); - } - - /** - * Send broadcast message - * - * @param msg - * @param timeout - * @throws Exception - */ - public void broadcast(Package msg, long timeout) throws Exception { - log - .info("SimplePubClientImpl|{}|broadcast|send|type={}|msg={}", clientNo, msg.getHeader().getCommand(), msg); - super.send(msg); - } - - @Override - public UserAgent getUserAgent() { - return userAgent; - } - - @ChannelHandler.Sharable - private class Handler extends SimpleChannelInboundHandler { - @Override - protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Exception { - log.info("SimplePubClientImpl|{}|receive|type={}|msg={}", clientNo, msg.getHeader(), msg); - - Command cmd = msg.getHeader().getCommand(); - if (cmd == Command.RESPONSE_TO_CLIENT) { - if (callback != null) { - callback.handle(msg, ctx); - } - Package pkg = MessageUtils.responseToClientAck(msg); - send(pkg); - } else if (cmd == Command.SERVER_GOODBYE_REQUEST) { - //TODO - } - - RequestContext context = contexts.get(RequestContext._key(msg)); - if (context != null) { - contexts.remove(context.getKey()); - context.finish(msg); - } - } - } - -} diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/EventMeshTCPSubClientImpl.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/EventMeshTCPSubClientImpl.java deleted file mode 100644 index b721d957d0..0000000000 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/EventMeshTCPSubClientImpl.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.client.tcp.impl; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; - -import org.apache.commons.collections4.CollectionUtils; -import org.apache.eventmesh.client.tcp.EventMeshTCPSubClient; -import org.apache.eventmesh.client.tcp.common.EventMeshCommon; -import org.apache.eventmesh.client.tcp.common.MessageUtils; -import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; -import org.apache.eventmesh.client.tcp.common.RequestContext; -import org.apache.eventmesh.client.tcp.common.TcpClient; -import org.apache.eventmesh.common.protocol.SubscriptionType; -import org.apache.eventmesh.common.protocol.SubscriptionItem; -import org.apache.eventmesh.common.protocol.SubscriptionMode; -import org.apache.eventmesh.common.protocol.tcp.*; - -import org.apache.eventmesh.common.protocol.tcp.Package; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EventMeshTCPSubClientImpl extends TcpClient implements EventMeshTCPSubClient { - - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - private UserAgent userAgent; - - private ReceiveMsgHook callback; - - private List subscriptionItems = new ArrayList<>(); - - private ScheduledFuture task; - - public EventMeshTCPSubClientImpl(String accessIp, int port, UserAgent agent) { - super(accessIp, port); - this.userAgent = agent; - } - - public void registerBusiHandler(ReceiveMsgHook handler) throws Exception { - callback = handler; - } - - public void init() throws Exception { - open(new Handler()); - hello(); - logger.info("SimpleSubClientImpl|{}|started!", clientNo); - } - - public void reconnect() throws Exception { - super.reconnect(); - hello(); - if (!CollectionUtils.isEmpty(subscriptionItems)) { - for (SubscriptionItem item : subscriptionItems) { - Package request = MessageUtils.subscribe(item.getTopic(), item.getMode(), item.getType()); - this.io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } - } - listen(); - } - - public void close() { - try { - task.cancel(false); - goodbye(); - super.close(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - public void heartbeat() throws Exception { - task = scheduler.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - if (!isActive()) { - EventMeshTCPSubClientImpl.this.reconnect(); - } - Package msg = MessageUtils.heartBeat(); - EventMeshTCPSubClientImpl.this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } catch (Exception ignore) { - } - } - }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); - } - - private void goodbye() throws Exception { - Package msg = MessageUtils.goodbye(); - this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } - - private void hello() throws Exception { - Package msg = MessageUtils.hello(userAgent); - this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } - - public void listen() throws Exception { - Package request = MessageUtils.listen(); - this.io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } - - - public void subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws Exception { - subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subscriptionType)); - Package request = MessageUtils.subscribe(topic, subscriptionMode, subscriptionType); - this.io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } - - public void unsubscribe() throws Exception { - Package request = MessageUtils.unsubscribe(); - this.io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } - - public UserAgent getUserAgent() { - return userAgent; - } - - @ChannelHandler.Sharable - private class Handler extends SimpleChannelInboundHandler { - @SuppressWarnings("Duplicates") - @Override - protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Exception { - Command cmd = msg.getHeader().getCommand(); - logger.info(EventMeshTCPSubClientImpl.class.getSimpleName() + "|receive|type={}|msg={}", cmd, msg); - if (cmd == Command.REQUEST_TO_CLIENT) { - if (callback != null) { - callback.handle(msg, ctx); - } - Package pkg = MessageUtils.requestToClientAck(msg); - send(pkg); - } else if (cmd == Command.ASYNC_MESSAGE_TO_CLIENT) { - Package pkg = MessageUtils.asyncMessageAck(msg); - if (callback != null) { - callback.handle(msg, ctx); - } - send(pkg); - } else if (cmd == Command.BROADCAST_MESSAGE_TO_CLIENT) { - Package pkg = MessageUtils.broadcastMessageAck(msg); - if (callback != null) { - callback.handle(msg, ctx); - } - send(pkg); - } else if (cmd == Command.SERVER_GOODBYE_REQUEST) { - //TODO - } else { - logger.error("msg ignored|{}|{}", cmd, msg); - } - RequestContext context = contexts.get(RequestContext._key(msg)); - if (context != null) { - contexts.remove(context.getKey()); - context.finish(msg); - } else { - logger.error("msg ignored,context not found.|{}|{}", cmd, msg); - } - } - } - - @Override - public String toString() { - return "SimpleSubClientImpl|clientNo=" + clientNo + "|" + userAgent; - } -} diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java index 0c03cb4e76..51341e16e0 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java @@ -32,20 +32,15 @@ public void reconnect() throws EventMeshException { } @Override - public Package rr(Package msg, long timeout) throws EventMeshException { + public Package rr(CloudEvent msg, long timeout) throws EventMeshException { return null; } @Override - public void asyncRR(Package msg, AsyncRRCallback callback, long timeout) throws EventMeshException { + public void asyncRR(CloudEvent msg, AsyncRRCallback callback, long timeout) throws EventMeshException { } - @Override - public Package publish(Package msg, long timeout) throws EventMeshException { - return null; - } - @Override public Package publish(CloudEvent cloudEvent, long timeout) throws EventMeshException { return null; @@ -56,21 +51,11 @@ public void broadcast(CloudEvent cloudEvent, long timeout) throws EventMeshExcep } - @Override - public void broadcast(Package msg, long timeout) throws EventMeshException { - - } - @Override public void registerBusiHandler(ReceiveMsgHook handler) throws EventMeshException { } - @Override - public UserAgent getUserAgent() { - return null; - } - @Override public void close() throws EventMeshException { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java index 9f572c2643..d726c4b90c 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java @@ -52,11 +52,6 @@ public void registerBusiHandler(ReceiveMsgHook handler) throws Event } - @Override - public UserAgent getUserAgent() { - return null; - } - @Override public void close() throws EventMeshException{ diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java index 4779278192..ba9e0afda4 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java @@ -83,9 +83,10 @@ public void reconnect() throws EventMeshException { } @Override - public Package rr(Package msg, long timeout) throws EventMeshException { + public Package rr(EventMeshMessage eventMeshMessage, long timeout) throws EventMeshException { try { - log.info("SimplePubClientImpl|{}|rr|send|type={}|msg={}", clientNo, msg.getHeader().getCommand(), msg); + Package msg = MessageUtils.asyncCloudEvent(eventMeshMessage); + log.info("{}|rr|send|type={}|msg={}", clientNo, msg, msg); return io(msg, timeout); } catch (Exception ex) { throw new EventMeshException("rr error"); @@ -93,8 +94,9 @@ public Package rr(Package msg, long timeout) throws EventMeshException { } @Override - public void asyncRR(Package msg, AsyncRRCallback callback, long timeout) throws EventMeshException { + public void asyncRR(EventMeshMessage eventMeshMessage, AsyncRRCallback callback, long timeout) throws EventMeshException { try { + Package msg = MessageUtils.asyncCloudEvent(eventMeshMessage); super.send(msg); this.callbackConcurrentHashMap.put((String) RequestContext._key(msg), callback); } catch (Exception ex) { @@ -103,17 +105,6 @@ public void asyncRR(Package msg, AsyncRRCallback callback, long timeout) throws } } - // todo: remove this method, just keep protocol message publish method - @Override - public Package publish(Package msg, long timeout) throws EventMeshException { - try { - log.info("SimplePubClientImpl|{}|publish|send|type={}|msg={}", clientNo, msg.getHeader().getCommand(), msg); - return io(msg, timeout); - } catch (Exception ex) { - throw new EventMeshException("Publish error", ex); - } - } - @Override public Package publish(EventMeshMessage eventMeshMessage, long timeout) throws EventMeshException { try { @@ -132,9 +123,8 @@ public Package publish(EventMeshMessage eventMeshMessage, long timeout) throws E public void broadcast(EventMeshMessage eventMeshMessage, long timeout) throws EventMeshException { try { // todo: transform EventMeshMessage to Package - Package msg = MessageUtils.asyncCloudEvent(cloudEvent); - log.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}", - clientNo, msg.getHeader().getCommand(), + Package msg = MessageUtils.asyncCloudEvent(eventMeshMessage); + log.info("{}|publish|send|type={}|protocol={}|msg={}", clientNo, msg.getHeader().getCommand(), msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg); super.send(msg); } catch (Exception ex) { @@ -142,18 +132,6 @@ public void broadcast(EventMeshMessage eventMeshMessage, long timeout) throws Ev } } - // todo: remove this method - @Override - public void broadcast(Package msg, long timeout) throws EventMeshException { - try { - log.info("SimplePubClientImpl|{}|broadcast|send|type={}|msg={}", clientNo, msg.getHeader().getCommand(), - msg); - super.send(msg); - } catch (Exception ex) { - throw new EventMeshException("Broadcast message error", ex); - } - } - @Override public void registerBusiHandler(ReceiveMsgHook receiveMsgHook) throws EventMeshException { this.callback = receiveMsgHook; diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java index b5a871d9f2..08aa170083 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java @@ -7,7 +7,6 @@ import org.apache.eventmesh.client.tcp.common.RequestContext; import org.apache.eventmesh.client.tcp.common.TcpClient; import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig; -import org.apache.eventmesh.client.tcp.impl.EventMeshTCPSubClientImpl; import org.apache.eventmesh.common.EventMeshMessage; import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.common.protocol.SubscriptionItem; @@ -136,7 +135,7 @@ private class Handler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Exception { Command cmd = msg.getHeader().getCommand(); - log.info(EventMeshTCPSubClientImpl.class.getSimpleName() + "|receive|type={}|msg={}", cmd, msg); + log.info("|receive|type={}|msg={}", cmd, msg); if (cmd == Command.REQUEST_TO_CLIENT) { if (callback != null) { callback.handle(msg, ctx); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java index 47a6c56c42..80e0fbb85a 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java @@ -29,20 +29,15 @@ public void reconnect() throws EventMeshException { } @Override - public Package rr(Package msg, long timeout) throws EventMeshException { + public Package rr(Message msg, long timeout) throws EventMeshException { return null; } @Override - public void asyncRR(Package msg, AsyncRRCallback callback, long timeout) throws EventMeshException { + public void asyncRR(Message msg, AsyncRRCallback callback, long timeout) throws EventMeshException { } - @Override - public Package publish(Package msg, long timeout) throws EventMeshException { - return null; - } - @Override public Package publish(Message cloudEvent, long timeout) throws EventMeshException { return null; @@ -53,21 +48,11 @@ public void broadcast(Message cloudEvent, long timeout) throws EventMeshExceptio } - @Override - public void broadcast(Package msg, long timeout) throws EventMeshException { - - } - @Override public void registerBusiHandler(ReceiveMsgHook handler) throws EventMeshException { } - @Override - public UserAgent getUserAgent() { - return null; - } - @Override public void close() throws EventMeshException { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java index db7396f1e6..c4ae11c0b8 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java @@ -48,11 +48,6 @@ public void registerBusiHandler(ReceiveMsgHook handler) throws EventMes } - @Override - public UserAgent getUserAgent() { - return null; - } - @Override public void close() throws EventMeshException {