From c715778e8f1de0db57182cfbf4f2f3c6d94ed34e Mon Sep 17 00:00:00 2001 From: nanoxiong Date: Thu, 17 Jun 2021 15:08:26 +0800 Subject: [PATCH] [ISSUE #366 ] remove custom-format topic concept (#388) * remove custom-format topic concept * remove custom-format topic concept * remove custom-format topic concept * remove custom-format topic concept * remove custom-format topic concept * remove custom-format topic concept * remove custom-format topic concept --- .../org/apache/eventmesh/common/IPUtil.java | 2 +- .../common/protocol/SubcriptionType.java | 26 ++++++- .../common/protocol/SubscriptionItem.java | 71 ++++++++++++++++++ .../common/protocol/SubscriptionMode.java | 45 ++++++++++++ .../body/client/HeartbeatRequestBody.java | 1 - .../http/body/client/RegRequestBody.java | 9 +-- .../body/client/SubscribeRequestBody.java | 15 ++-- .../common/protocol/tcp/Subscription.java | 12 ++-- .../runtime/boot/EventMeshHTTPServer.java | 4 +- .../runtime/boot/EventMeshServer.java | 1 - .../runtime/constants/DeFiBusConstant.java | 72 ------------------- .../consumergroup/ConsumerGroupTopicConf.java | 18 ++++- .../http/consumer/ConsumerGroupManager.java | 6 +- .../http/consumer/ConsumerManager.java | 1 + .../http/consumer/EventMeshConsumer.java | 18 ++--- .../http/consumer/HandleMsgContext.java | 18 ++++- .../http/processor/SubscribeProcessor.java | 23 +++--- .../http/processor/UnSubscribeProcessor.java | 1 + .../http/push/AsyncHTTPPushRequest.java | 3 +- .../tcp/client/group/ClientGroupWrapper.java | 22 +++--- .../group/ClientSessionGroupMapping.java | 12 ++-- .../protocol/tcp/client/session/Session.java | 33 +++++---- .../tcp/client/session/SessionContext.java | 5 +- .../session/push/DownStreamMsgContext.java | 8 ++- .../client/session/push/SessionPusher.java | 13 ++-- .../push/retry/EventMeshTcpRetryer.java | 7 +- .../tcp/client/task/MessageTransferTask.java | 1 + .../tcp/client/task/SubscribeTask.java | 21 +++--- .../tcp/client/task/UnSubscribeTask.java | 8 ++- .../eventmesh/runtime/util/EventMeshUtil.java | 58 --------------- .../src/test/java/client/EventMeshClient.java | 10 ++- .../src/test/java/client/SubClient.java | 10 ++- .../test/java/client/common/MessageUtils.java | 38 +++++----- .../java/client/impl/EventMeshClientImpl.java | 17 ++--- .../test/java/client/impl/SubClientImpl.java | 37 ++++------ .../src/test/java/demo/AsyncSubClient.java | 4 +- .../test/java/demo/BroadCastSubClient.java | 4 +- .../src/test/java/demo/CCSubClient.java | 4 +- .../src/test/java/demo/CClientDemo.java | 6 +- .../src/test/java/demo/SyncSubClient.java | 4 +- .../client/http/consumer/LiteConsumer.java | 29 +++++--- .../eventmesh/client/tcp/EventMeshClient.java | 4 +- .../eventmesh/client/tcp/SimpleSubClient.java | 4 +- .../client/tcp/common/MessageUtils.java | 19 ++--- .../tcp/impl/DefaultEventMeshClient.java | 6 +- .../client/tcp/impl/SimpleSubClientImpl.java | 22 +++--- .../client/tcp/demo/AsyncSubscribe.java | 4 +- .../tcp/demo/AsyncSubscribeBroadcast.java | 4 +- .../client/tcp/demo/SyncResponse.java | 4 +- .../http/demo/sub/service/SubService.java | 12 +++- .../eventmesh/tcp/demo/AsyncSubscribe.java | 4 +- .../tcp/demo/AsyncSubscribeBroadcast.java | 4 +- .../eventmesh/tcp/demo/SyncResponse.java | 4 +- 53 files changed, 436 insertions(+), 352 deletions(-) rename eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/event/ConsumerGroupInstanceChangeEvent.java => eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubcriptionType.java (67%) create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionMode.java delete mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/DeFiBusConstant.java diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/IPUtil.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/IPUtil.java index 4cd19e4f3b..5165deb8af 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/IPUtil.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/IPUtil.java @@ -35,7 +35,7 @@ public class IPUtil { public static String getLocalAddress() { // if the progress works under docker environment // return the host ip about this docker located from environment value - String dockerHostIp = System.getenv("webank_docker_host_ip"); + String dockerHostIp = System.getenv("docker_host_ip"); if (dockerHostIp != null && !"".equals(dockerHostIp)) return dockerHostIp; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/event/ConsumerGroupInstanceChangeEvent.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubcriptionType.java similarity index 67% rename from eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/event/ConsumerGroupInstanceChangeEvent.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubcriptionType.java index f06ba0263a..6b6cb16823 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/event/ConsumerGroupInstanceChangeEvent.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubcriptionType.java @@ -15,7 +15,29 @@ * limitations under the License. */ -package org.apache.eventmesh.runtime.core.consumergroup.event; +package org.apache.eventmesh.common.protocol; -public class ConsumerGroupInstanceChangeEvent { +public enum SubcriptionType { + /** + * SYNC + */ + SYNC("SYNC"), + /** + * ASYNC + */ + ASYNC("ASYNC"); + + private String type; + + SubcriptionType(String type) { + this.type = type; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java new file mode 100644 index 0000000000..abcc22c20e --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.protocol; + +public class SubscriptionItem { + + private String topic; + + private SubscriptionMode mode; + + private SubcriptionType type; + + public SubscriptionItem() { + } + + public SubscriptionItem(String topic, SubscriptionMode mode, SubcriptionType type) { + this.topic = topic; + this.mode = mode; + this.type = type; + } + + public SubcriptionType getType() { + return type; + } + + public void setType(SubcriptionType type) { + this.type = type; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public SubscriptionMode getMode() { + return mode; + } + + public void setMode(SubscriptionMode mode) { + this.mode = mode; + } + + @Override + public String toString() { + return "SubscriptionItem{" + + "topic=" + topic + + ", mode=" + mode + + ", type=" + type + + '}'; + } +} + + diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionMode.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionMode.java new file mode 100644 index 0000000000..ad4b751a46 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionMode.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.protocol; + +public enum SubscriptionMode { + + /** + * broadcast + */ + BROADCASTING("BROADCASTING"), + /** + * clustering + */ + CLUSTERING("CLUSTERING"); + + private String mode; + + SubscriptionMode(String mode) { + this.mode = mode; + } + + public String getMode() { + return mode; + } + + public void setMode(String mode) { + this.mode = mode; + } + +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java index 5c3d1ea09e..ace4d719fd 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java @@ -32,7 +32,6 @@ public class HeartbeatRequestBody extends Body { public static final String CLIENTTYPE = "clientType"; public static final String HEARTBEATENTITIES = "heartbeatEntities"; - private String clientType; private List heartbeatEntities; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java index 41c5c5ab98..78bb684cbd 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java @@ -25,6 +25,7 @@ import com.alibaba.fastjson.JSONArray; import org.apache.commons.collections4.MapUtils; +import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.http.body.Body; public class RegRequestBody extends Body { @@ -39,13 +40,13 @@ public class RegRequestBody extends Body { private String endPoint; - private List topics; + private List topics; - public List getTopics() { + public List getTopics() { return topics; } - public void setTopics(List topics) { + public void setTopics(List topics) { this.topics = topics; } @@ -69,7 +70,7 @@ public static RegRequestBody buildBody(Map bodyParam) { RegRequestBody body = new RegRequestBody(); body.setClientType(MapUtils.getString(bodyParam, CLIENTTYPE)); body.setEndPoint(MapUtils.getString(bodyParam, ENDPOINT)); - body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPICS), String.class)); + body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPICS), SubscriptionItem.class)); return body; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java index 6a37cc53d2..991b83420c 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java @@ -25,6 +25,7 @@ import com.alibaba.fastjson.JSONArray; import org.apache.commons.collections4.MapUtils; +import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.http.body.Body; public class SubscribeRequestBody extends Body { @@ -33,20 +34,18 @@ public class SubscribeRequestBody extends Body { public static final String URL = "url"; - private List topics; + private List topics; - private String url; - - private String topic; - - public List getTopics() { + public List getTopics() { return topics; } - public void setTopics(List topics) { + public void setTopics(List topics) { this.topics = topics; } + private String url; + public String getUrl() { return url; } @@ -58,7 +57,7 @@ public void setUrl(String url) { public static SubscribeRequestBody buildBody(Map bodyParam) { SubscribeRequestBody body = new SubscribeRequestBody(); body.setUrl(MapUtils.getString(bodyParam, URL)); - body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPIC), String.class)); + body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPIC), SubscriptionItem.class)); return body; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Subscription.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Subscription.java index 83b0dc9fcc..3fb74b95b0 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Subscription.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Subscription.java @@ -17,25 +17,27 @@ package org.apache.eventmesh.common.protocol.tcp; +import org.apache.eventmesh.common.protocol.SubscriptionItem; + import java.util.LinkedList; import java.util.List; public class Subscription { - private List topicList = new LinkedList<>(); + private List topicList = new LinkedList<>(); public Subscription() { } - public Subscription(List topicList) { + public Subscription(List topicList) { this.topicList = topicList; } - public List getTopicList() { + public List getTopicList() { return topicList; } - public void setTopicList(List topicList) { + public void setTopicList(List topicList) { this.topicList = topicList; } @@ -45,6 +47,4 @@ public String toString() { "topicList=" + topicList + '}'; } - - } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java index 80540c0d75..84a2ab79b0 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java @@ -54,9 +54,9 @@ public class EventMeshHTTPServer extends AbrstractHTTPServer { private EventMeshHTTPConfiguration eventMeshHttpConfiguration; - public final ConcurrentHashMap localConsumerGroupMapping = new ConcurrentHashMap<>(); + public final ConcurrentHashMap localConsumerGroupMapping = new ConcurrentHashMap<>(); - public final ConcurrentHashMap> localClientInfoMapping = new ConcurrentHashMap<>(); + public final ConcurrentHashMap> localClientInfoMapping = new ConcurrentHashMap<>(); public EventMeshHTTPServer(EventMeshServer eventMeshServer, EventMeshHTTPConfiguration eventMeshHttpConfiguration) { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java index 61bfb89bdb..19d69533ef 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java @@ -54,7 +54,6 @@ public void init() throws Exception { String eventstore = System.getProperty(EventMeshConstants.EVENT_STORE_PROPERTIES, System.getenv(EventMeshConstants.EVENT_STORE_ENV)); logger.info("eventstore : {}", eventstore); -// logger.info("load custom {} class for eventMesh", ConsumeMessageConcurrentlyService.class.getCanonicalName()); serviceState = ServiceState.INITED; logger.info("server state:{}", serviceState); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/DeFiBusConstant.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/DeFiBusConstant.java deleted file mode 100644 index 898854d770..0000000000 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/DeFiBusConstant.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to Apache Software Foundation (ASF) under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Apache Software Foundation (ASF) licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.eventmesh.runtime.constants; - -//TODO -public class DeFiBusConstant { - public static final String PROPERTY_MESSAGE_REPLY_TO = "REPLY_TO"; //requester clientId - - public static final String PROPERTY_RR_REQUEST_ID = "RR_REQUEST_UNIQ_ID"; - - public static final String PROPERTY_MESSAGE_TTL = "TTL"; //timeout for request-response - - public static final String PROPERTY_MESSAGE_CLUSTER = "CLUSTER"; //cluster name - - public static final String PROPERTY_MESSAGE_BROKER = "BROKER"; //broker name where message stored - - public static final String REDIRECT = "REDIRECT"; - - public static final String REDIRECT_FLAG = "REDIRECT_FLAG"; - - public static final String PLUGIN_CLASS_NAME = "org.apache.defibus.broker.plugin.DeFiPluginMessageStore"; - - public static final String RR_REPLY_TOPIC = "rr-reply-topic"; //post fix for reply topic - - public static final String KEY = "msgType"; - - public static final String DEFAULT_TTL = "14400000"; - - public static final String EXT_CONSUMER_GROUP = "ExtConsumerGroup"; - - public static final String RMQ_SYS = "RMQ_SYS_"; - - /** - * msgType1: indicate the msg is broadcast message - */ - public static final String DIRECT = "direct"; - - /** - * msgType2: msg of type except broadcast and reply - */ - public static final String PERSISTENT = "persistent"; - - /** - * msgType3: indicate the msg is which consumer reply to producer - */ - public static final String REPLY = "reply"; - - public static final String INSTANCE_NAME_SEPERATER = "#"; - - public static final String IDC_SEPERATER = "-"; - - public static final String LEAVE_TIME = "LEAVE_TIME"; //leaveBrokerTime - public static final String ARRIVE_TIME = "ARRIVE_TIME"; - public static final String STORE_TIME = "STORE_TIME"; - -} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java index 5a16c4c703..f29d90ae67 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java @@ -25,6 +25,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +37,11 @@ public class ConsumerGroupTopicConf { private String topic; + /** + * @see org.apache.eventmesh.common.protocol.SubscriptionItem + */ + private SubscriptionItem subscriptionItem; + /** * PUSH URL */ @@ -53,12 +59,13 @@ public boolean equals(Object o) { ConsumerGroupTopicConf that = (ConsumerGroupTopicConf) o; return consumerGroup.equals(that.consumerGroup) && Objects.equals(topic, that.topic) && + Objects.equals(subscriptionItem, that.subscriptionItem) && Objects.equals(idcUrls, that.idcUrls); } @Override public int hashCode() { - return Objects.hash(consumerGroup, topic, idcUrls); + return Objects.hash(consumerGroup, topic, subscriptionItem, idcUrls); } @Override @@ -66,6 +73,7 @@ public String toString() { StringBuilder sb = new StringBuilder(); sb.append("consumeTopicConfig={consumerGroup=").append(consumerGroup) .append(",topic=").append(topic) + .append(",subscriptionMode=").append(subscriptionItem) .append(",idcUrls=").append(idcUrls).append("}"); return sb.toString(); } @@ -86,6 +94,14 @@ public void setTopic(String topic) { this.topic = topic; } + public SubscriptionItem getSubscriptionItem() { + return subscriptionItem; + } + + public void setSubscriptionItem(SubscriptionItem subscriptionItem) { + this.subscriptionItem = subscriptionItem; + } + public Map> getIdcUrls() { return idcUrls; } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerGroupManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerGroupManager.java index 76381e322e..c73523bdb1 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerGroupManager.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerGroupManager.java @@ -17,10 +17,12 @@ package org.apache.eventmesh.runtime.core.protocol.http.consumer; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer; import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf; +import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf; public class ConsumerGroupManager { @@ -52,8 +54,8 @@ public synchronized void start() throws Exception { } private synchronized void setupEventMeshConsumer(ConsumerGroupConf consumerGroupConfig) throws Exception { - for (String topic : consumerGroupConfig.getConsumerGroupTopicConf().keySet()) { - eventMeshConsumer.subscribe(topic); + for (Map.Entry conf : consumerGroupConfig.getConsumerGroupTopicConf().entrySet()) { + eventMeshConsumer.subscribe(conf.getKey(), conf.getValue().getSubscriptionItem()); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java index 53e8a3c3bd..14486bba19 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java @@ -113,6 +113,7 @@ public void run() { ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf(); latestTopicConf.setConsumerGroup(consumerGroup); latestTopicConf.setTopic(topic); + latestTopicConf.setSubscriptionItem(map.get(topicKey).getSubscriptionItem()); latestTopicConf.setUrls(clientUrls); latestTopicConf.setIdcUrls(idcUrls); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java index 36dee3c078..e39afb2526 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java @@ -33,6 +33,8 @@ import org.apache.eventmesh.api.AbstractContext; import org.apache.eventmesh.api.MeshAsyncConsumeContext; import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.protocol.SubscriptionItem; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf; @@ -108,9 +110,9 @@ public synchronized void start() throws Exception { started4Broadcast.compareAndSet(false, true); } - public void subscribe(String topic) throws Exception { + public void subscribe(String topic, SubscriptionItem subscriptionItem) throws Exception { AsyncMessageListener listener = null; - if (!EventMeshUtil.isBroadcast(topic)) { + if (!SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) { listener = new AsyncMessageListener() { @Override public void consume(Message message, AsyncConsumeContext context) { @@ -139,7 +141,7 @@ public void consume(Message message, AsyncConsumeContext context) { } } HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this, - topic, message, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig); + topic, message, subscriptionItem, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig); if (httpMessageHandler.handle(handleMsgContext)) { // context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name()); @@ -188,7 +190,7 @@ public void consume(Message message, AsyncConsumeContext context) { } } HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this, - topic, message, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig); + topic, message, subscriptionItem, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig); if (httpMessageHandler.handle(handleMsgContext)) { // context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name()); @@ -210,8 +212,8 @@ public void consume(Message message, AsyncConsumeContext context) { } } - public void unsubscribe(String topic) throws Exception { - if (EventMeshUtil.isBroadcast(topic)) { + public void unsubscribe(String topic, SubscriptionMode subscriptionMode) throws Exception { + if (SubscriptionMode.BROADCASTING.equals(subscriptionMode)) { broadcastMqConsumer.unsubscribe(topic); } else { persistentMqConsumer.unsubscribe(topic); @@ -234,8 +236,8 @@ public synchronized void shutdown() throws Exception { started4Broadcast.compareAndSet(true, false); } - public void updateOffset(String topic, List msgs, AbstractContext context) { - if (EventMeshUtil.isBroadcast(topic)) { + public void updateOffset(String topic, SubscriptionMode subscriptionMode, List msgs, AbstractContext context) { + if (SubscriptionMode.BROADCASTING.equals(subscriptionMode)) { broadcastMqConsumer.updateOffset(msgs, context); } else { persistentMqConsumer.updateOffset(msgs, context); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java index 01768caf8d..2d19174b11 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java @@ -27,6 +27,8 @@ import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.eventmesh.api.AbstractContext; import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.protocol.SubscriptionItem; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf; @@ -50,6 +52,8 @@ public class HandleMsgContext { private String topic; + private SubscriptionItem subscriptionItem; + private Message msg; private int ttl; @@ -67,7 +71,7 @@ public class HandleMsgContext { private Map props; public HandleMsgContext(String msgRandomNo, String consumerGroup, EventMeshConsumer eventMeshConsumer, - String topic, Message msg, + String topic, Message msg, SubscriptionItem subscriptionItem, AbstractContext context, ConsumerGroupConf consumerGroupConfig, EventMeshHTTPServer eventMeshHTTPServer, String bizSeqNo, String uniqueId, ConsumerGroupTopicConf consumeTopicConfig) { this.msgRandomNo = msgRandomNo; @@ -75,6 +79,7 @@ public HandleMsgContext(String msgRandomNo, String consumerGroup, EventMeshConsu this.eventMeshConsumer = eventMeshConsumer; this.topic = topic; this.msg = msg; + this.subscriptionItem = subscriptionItem; this.context = context; this.consumerGroupConfig = consumerGroupConfig; this.eventMeshHTTPServer = eventMeshHTTPServer; @@ -152,6 +157,14 @@ public void setMsg(Message msg) { this.msg = msg; } + public SubscriptionItem getSubscriptionItem() { + return subscriptionItem; + } + + public void setSubscriptionItem(SubscriptionItem subscriptionItem) { + this.subscriptionItem = subscriptionItem; + } + public long getCreateTime() { return createTime; } @@ -188,7 +201,7 @@ public void finish() { // msg.getProperty(DeFiBusConstant.PROPERTY_MESSAGE_BROKER), // msg.getQueueId(), msg.getQueueOffset()); } - eventMeshConsumer.updateOffset(topic, Arrays.asList(msg), context); + eventMeshConsumer.updateOffset(topic, subscriptionItem.getMode(), Arrays.asList(msg), context); } } @@ -214,6 +227,7 @@ public String toString() { sb.append("handleMsgContext={") .append("consumerGroup=").append(consumerGroup) .append(",topic=").append(topic) + .append(",subscriptionItem=").append(subscriptionItem) .append(",consumeTopicConfig=").append(consumeTopicConfig) .append(",bizSeqNo=").append(bizSeqNo) .append(",uniqueId=").append(uniqueId) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java index 1d0b6e89fe..2186944bc6 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java @@ -30,6 +30,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.IPUtil; import org.apache.eventmesh.common.command.HttpCommand; +import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.http.body.client.SubscribeRequestBody; import org.apache.eventmesh.common.protocol.http.body.client.SubscribeResponseBody; import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode; @@ -98,7 +99,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext asyncContext.onComplete(responseEventMeshCommand); return; } - List subTopicList = subscribeRequestBody.getTopics(); + List subTopicList = subscribeRequestBody.getTopics(); String url = subscribeRequestBody.getUrl(); String consumerGroup = EventMeshUtil.buildClientGroup(subscribeRequestHeader.getSys(), @@ -108,8 +109,8 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext registerClient(subscribeRequestHeader, consumerGroup, subTopicList, url); - for (String subTopic : subTopicList) { - List groupTopicClients = eventMeshHTTPServer.localClientInfoMapping.get(consumerGroup + "@" + subTopic); + for (SubscriptionItem subTopic : subTopicList) { + List groupTopicClients = eventMeshHTTPServer.localClientInfoMapping.get(consumerGroup + "@" + subTopic.getTopic()); if (CollectionUtils.isEmpty(groupTopicClients)) { httpLogger.error("group {} topic {} clients is empty", consumerGroup, subTopic); @@ -131,23 +132,25 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext consumerGroupConf = new ConsumerGroupConf(consumerGroup); ConsumerGroupTopicConf consumeTopicConfig = new ConsumerGroupTopicConf(); consumeTopicConfig.setConsumerGroup(consumerGroup); - consumeTopicConfig.setTopic(subTopic); + consumeTopicConfig.setTopic(subTopic.getTopic()); + consumeTopicConfig.setSubscriptionItem(subTopic); consumeTopicConfig.setUrls(new HashSet<>(Arrays.asList(url))); consumeTopicConfig.setIdcUrls(idcUrls); Map map = new HashMap<>(); - map.put(subTopic, consumeTopicConfig); + map.put(subTopic.getTopic(), consumeTopicConfig); consumerGroupConf.setConsumerGroupTopicConf(map); } else { // 已有订阅 Map map = consumerGroupConf.getConsumerGroupTopicConf(); for (String key : map.keySet()) { - if (StringUtils.equals(subTopic, key)) { + if (StringUtils.equals(subTopic.getTopic(), key)) { ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf(); ConsumerGroupTopicConf currentTopicConf = map.get(key); latestTopicConf.setConsumerGroup(consumerGroup); - latestTopicConf.setTopic(subTopic); + latestTopicConf.setTopic(subTopic.getTopic()); + latestTopicConf.setSubscriptionItem(subTopic); latestTopicConf.setUrls(new HashSet<>(Arrays.asList(url))); latestTopicConf.getUrls().addAll(currentTopicConf.getUrls()); @@ -206,8 +209,8 @@ public boolean rejectRequest() { } private void registerClient(SubscribeRequestHeader subscribeRequestHeader, String consumerGroup, - List topicList, String url) { - for(String topic: topicList) { + List subscriptionItems, String url) { + for(SubscriptionItem item: subscriptionItems) { Client client = new Client(); client.env = subscribeRequestHeader.getEnv(); client.dcn = subscribeRequestHeader.getDcn(); @@ -216,7 +219,7 @@ private void registerClient(SubscribeRequestHeader subscribeRequestHeader, Strin client.ip = subscribeRequestHeader.getIp(); client.pid = subscribeRequestHeader.getPid(); client.consumerGroup = consumerGroup; - client.topic = topic; + client.topic = item.getTopic(); client.url = url; client.lastUpTime = new Date(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java index 4d9f2e25fa..b723054522 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java @@ -166,6 +166,7 @@ public void onResponse(HttpCommand httpCommand) { ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf(); latestTopicConf.setConsumerGroup(consumerGroup); latestTopicConf.setTopic(unSubTopic); + latestTopicConf.setSubscriptionItem(map.get(topicKey).getSubscriptionItem()); latestTopicConf.setUrls(clientUrls); latestTopicConf.setIdcUrls(idcUrls); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java index af71ce837b..831296a342 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java @@ -35,6 +35,7 @@ import org.apache.commons.text.RandomStringGenerator; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.IPUtil; +import org.apache.eventmesh.common.protocol.SubcriptionType; import org.apache.eventmesh.common.protocol.http.body.message.PushMessageRequestBody; import org.apache.eventmesh.common.protocol.http.common.ClientRetCode; import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; @@ -88,7 +89,7 @@ public void tryHTTPRequest() { String requestCode = ""; - if (EventMeshUtil.isService(handleMsgContext.getTopic())) { + if (SubcriptionType.SYNC.equals(handleMsgContext.getSubscriptionItem().getType())) { requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_SYNC.getRequestCode()); } else { requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_ASYNC.getRequestCode()); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java index 0bc645ecc5..2c3a46fed6 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java @@ -43,6 +43,8 @@ import org.apache.eventmesh.api.MeshAsyncConsumeContext; import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.protocol.SubscriptionItem; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration; import org.apache.eventmesh.runtime.constants.EventMeshConstants; @@ -511,9 +513,9 @@ public synchronized void startClientGroupBroadcastConsumer() throws Exception { logger.info("starting broadCastMsgConsumer success, group:{}", groupName); } - public void subscribe(String topic) throws Exception { + public void subscribe(SubscriptionItem subscriptionItem) throws Exception { AsyncMessageListener listener = null; - if (EventMeshUtil.isBroadcast(topic)) { + if (SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) { listener = new AsyncMessageListener() { @Override public void consume(Message message, AsyncConsumeContext context) { @@ -534,7 +536,7 @@ public void consume(Message message, AsyncConsumeContext context) { Iterator sessionsItr = groupConsumerSessions.iterator(); DownStreamMsgContext downStreamMsgContext = - new DownStreamMsgContext(message, null, broadCastMsgConsumer, ((MeshAsyncConsumeContext)context).getContext(), false); + new DownStreamMsgContext(message, null, broadCastMsgConsumer, ((MeshAsyncConsumeContext)context).getContext(), false, subscriptionItem); while (sessionsItr.hasNext()) { Session session = sessionsItr.next(); @@ -562,7 +564,7 @@ public void run() { context.commit(Action.CommitMessage); } }; - broadCastMsgConsumer.subscribe(topic, listener); + broadCastMsgConsumer.subscribe(subscriptionItem.getTopic(), listener); } else { listener = new AsyncMessageListener() { @Override @@ -606,7 +608,7 @@ public void consume(Message message, AsyncConsumeContext context) { } DownStreamMsgContext downStreamMsgContext = - new DownStreamMsgContext(message, session, persistentMsgConsumer, ((MeshAsyncConsumeContext)context).getContext(), false); + new DownStreamMsgContext(message, session, persistentMsgConsumer, ((MeshAsyncConsumeContext)context).getContext(), false, subscriptionItem); //msg put in eventmesh,waiting client ack session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext); session.downstreamMsg(downStreamMsgContext); @@ -615,15 +617,15 @@ public void consume(Message message, AsyncConsumeContext context) { context.commit(Action.CommitMessage); } }; - persistentMsgConsumer.subscribe(topic, listener); + persistentMsgConsumer.subscribe(subscriptionItem.getTopic(), listener); } } - public void unsubscribe(String topic) throws Exception { - if (EventMeshUtil.isBroadcast(topic)) { - broadCastMsgConsumer.unsubscribe(topic); + public void unsubscribe(SubscriptionItem subscriptionItem) throws Exception { + if (SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) { + broadCastMsgConsumer.unsubscribe(subscriptionItem.getTopic()); } else { - persistentMsgConsumer.unsubscribe(topic); + persistentMsgConsumer.unsubscribe(subscriptionItem.getTopic()); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java index c45c545a08..c4cd17005e 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java @@ -34,6 +34,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.ThreadUtil; +import org.apache.eventmesh.common.protocol.SubscriptionItem; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; import org.apache.eventmesh.runtime.constants.EventMeshConstants; @@ -280,10 +282,10 @@ private void cleanClientGroupWrapperByClosePub(Session session) throws Exception * @param session */ private void cleanSubscriptionInSession(Session session) throws Exception { - for (String topic : session.getSessionContext().subscribeTopics.values()) { - session.getClientGroupWrapper().get().removeSubscription(topic, session); - if (!session.getClientGroupWrapper().get().hasSubscription(topic)) { - session.getClientGroupWrapper().get().unsubscribe(topic); + for (SubscriptionItem item : session.getSessionContext().subscribeTopics.values()) { + session.getClientGroupWrapper().get().removeSubscription(item.getTopic(), session); + if (!session.getClientGroupWrapper().get().hasSubscription(item.getTopic())) { + session.getClientGroupWrapper().get().unsubscribe(item); } } } @@ -298,7 +300,7 @@ private void handleUnackMsgsInSession(Session session) { if (unAckMsg.size() > 0 && session.getClientGroupWrapper().get().getGroupConsumerSessions().size() > 0) { for (Map.Entry entry : unAckMsg.entrySet()) { DownStreamMsgContext downStreamMsgContext = entry.getValue(); - if (EventMeshUtil.isBroadcast(downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION))) { + if (SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) { logger.warn("exist broadcast msg unack when closeSession,seq:{},bizSeq:{},client:{}", downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt), session.getClient()); continue; } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java index 4ff0480386..fca8d2aed7 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java @@ -32,10 +32,9 @@ import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.eventmesh.common.Constants; -import org.apache.eventmesh.common.protocol.tcp.Header; -import org.apache.eventmesh.common.protocol.tcp.OPStatus; +import org.apache.eventmesh.common.protocol.SubscriptionItem; +import org.apache.eventmesh.common.protocol.tcp.*; import org.apache.eventmesh.common.protocol.tcp.Package; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper; @@ -162,27 +161,27 @@ public void setListenRequestSeq(String listenRequestSeq) { this.listenRequestSeq = listenRequestSeq; } - public void subscribe(List topics) throws Exception { - for (String topic : topics) { - sessionContext.subscribeTopics.putIfAbsent(topic, topic); - clientGroupWrapper.get().subscribe(topic); + public void subscribe(List items) throws Exception { + for (SubscriptionItem item : items) { + sessionContext.subscribeTopics.putIfAbsent(item.getTopic(), item); + clientGroupWrapper.get().subscribe(item); - clientGroupWrapper.get().getMqProducerWrapper().getMeshMQProducer().getDefaultTopicRouteInfoFromNameServer(topic, + clientGroupWrapper.get().getMqProducerWrapper().getMeshMQProducer().getDefaultTopicRouteInfoFromNameServer(item.getTopic(), EventMeshConstants.DEFAULT_TIME_OUT_MILLS); - clientGroupWrapper.get().addSubscription(topic, this); - subscribeLogger.info("subscribe|succeed|topic={}|user={}", topic, client); + clientGroupWrapper.get().addSubscription(item.getTopic(), this); + subscribeLogger.info("subscribe|succeed|topic={}|user={}", item.getTopic(), client); } } - public void unsubscribe(List topics) throws Exception { - for (String topic : topics) { - sessionContext.subscribeTopics.remove(topic); - clientGroupWrapper.get().removeSubscription(topic, this); + public void unsubscribe(List items) throws Exception { + for (SubscriptionItem item : items) { + sessionContext.subscribeTopics.remove(item.getTopic()); + clientGroupWrapper.get().removeSubscription(item.getTopic(), this); - if (!clientGroupWrapper.get().hasSubscription(topic)) { - clientGroupWrapper.get().unsubscribe(topic); - subscribeLogger.info("unSubscribe|succeed|topic={}|lastUser={}", topic, client); + if (!clientGroupWrapper.get().hasSubscription(item.getTopic())) { + clientGroupWrapper.get().unsubscribe(item); + subscribeLogger.info("unSubscribe|succeed|topic={}|lastUser={}", item.getTopic(), client); } } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java index 9d5b22e639..e8f851baec 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java @@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang3.time.DateFormatUtils; +import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.runtime.constants.EventMeshConstants; public class SessionContext { @@ -28,7 +29,7 @@ public class SessionContext { public ConcurrentHashMap sendTopics = new ConcurrentHashMap(); - public ConcurrentHashMap subscribeTopics = new ConcurrentHashMap(); + public ConcurrentHashMap subscribeTopics = new ConcurrentHashMap(); public long createTime = System.currentTimeMillis(); @@ -38,7 +39,7 @@ public SessionContext(Session session) { @Override public String toString() { - return "SessionContext{subscribeTopics=" + subscribeTopics.keySet() + return "SessionContext{subscribeTopics=" + subscribeTopics + ",sendTopics=" + sendTopics.keySet() + ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT) + "}"; } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java index 01ba40e416..8e441bd5aa 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java @@ -28,6 +28,8 @@ import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.eventmesh.api.AbstractContext; import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.protocol.SubscriptionItem; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; @@ -51,6 +53,8 @@ public class DownStreamMsgContext implements Delayed { public int retryTimes; + public SubscriptionItem subscriptionItem; + private long executeTime; public long lastPushTime; @@ -61,7 +65,7 @@ public class DownStreamMsgContext implements Delayed { public boolean msgFromOtherEventMesh; - public DownStreamMsgContext(Message msgExt, Session session, MQConsumerWrapper consumer, AbstractContext consumeConcurrentlyContext, boolean msgFromOtherEventMesh) { + public DownStreamMsgContext(Message msgExt, Session session, MQConsumerWrapper consumer, AbstractContext consumeConcurrentlyContext, boolean msgFromOtherEventMesh, SubscriptionItem subscriptionItem) { this.seq = String.valueOf(ServerGlobal.getInstance().getMsgCounter().incrementAndGet()); this.msgExt = msgExt; this.session = session; @@ -71,6 +75,7 @@ public DownStreamMsgContext(Message msgExt, Session session, MQConsumerWrapper c this.lastPushTime = System.currentTimeMillis(); this.executeTime = System.currentTimeMillis(); this.createTime = System.currentTimeMillis(); + this.subscriptionItem = subscriptionItem; String ttlStr = msgExt.getUserProperties("TTL"); long ttl = StringUtils.isNumeric(ttlStr) ? Long.parseLong(ttlStr) : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS; this.expireTime = System.currentTimeMillis() + ttl; @@ -108,6 +113,7 @@ public String toString() { ",consumer=" + consumer + // todo ",consumerGroup=" + consumer.getClass().getConsumerGroup() + ",topic=" + msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION) + + ",subscriptionItem=" + subscriptionItem + ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT) + ",executeTime=" + DateFormatUtils.format(executeTime, EventMeshConstants.DATE_FORMAT) + ",lastPushTime=" + DateFormatUtils.format(lastPushTime, EventMeshConstants.DATE_FORMAT) + '}'; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java index b1171f2241..12c7cd14e7 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java @@ -21,10 +21,9 @@ import io.netty.channel.ChannelFutureListener; import org.apache.commons.collections4.CollectionUtils; import org.apache.eventmesh.common.Constants; -import org.apache.eventmesh.common.protocol.tcp.Command; -import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; -import org.apache.eventmesh.common.protocol.tcp.Header; -import org.apache.eventmesh.common.protocol.tcp.OPStatus; +import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionMode; +import org.apache.eventmesh.common.protocol.tcp.*; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; @@ -63,9 +62,9 @@ public String toString() { public void push(final DownStreamMsgContext downStreamMsgContext) { Command cmd; - if (EventMeshUtil.isBroadcast(downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION))) { + if (SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) { cmd = Command.BROADCAST_MESSAGE_TO_CLIENT; - } else if (EventMeshUtil.isService(downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION))) { + } else if (SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())) { cmd = Command.REQUEST_TO_CLIENT; } else { cmd = Command.ASYNC_MESSAGE_TO_CLIENT; @@ -102,7 +101,7 @@ public void operationComplete(ChannelFuture future) throws Exception { logger.warn("isolate client:{},isolateTime:{}", session.getClient(), isolateTime); //retry - long delayTime = EventMeshUtil.isService(downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION)) ? 0 : session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryDelayInMills; + long delayTime = SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) ? 0 : session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryDelayInMills; downStreamMsgContext.delay(delayTime); session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(downStreamMsgContext); } else { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java index fc619b3d87..b8fcf0fd9f 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java @@ -28,6 +28,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; @@ -74,7 +76,7 @@ public void pushRetry(DownStreamMsgContext downStreamMsgContext) { return; } - int maxRetryTimes = EventMeshUtil.isService(downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION)) ? 1 : eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryTimes; + int maxRetryTimes = SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) ? 1 : eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryTimes; if (downStreamMsgContext.retryTimes >= maxRetryTimes) { logger.warn("pushRetry fail,retry over maxRetryTimes:{}, retryTimes:{}, seq:{}, bizSeq:{}", maxRetryTimes, downStreamMsgContext.retryTimes, downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)); @@ -118,7 +120,7 @@ private void retryHandle(DownStreamMsgContext downStreamMsgContext) { Session rechoosen = null; String topic = downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION); - if (!EventMeshUtil.isBroadcast(topic)) { + if (!SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) { rechoosen = downStreamMsgContext.session.getClientGroupWrapper() .get().getDownstreamDispatchStrategy().select(downStreamMsgContext.session.getClientGroupWrapper().get().getGroupName() , topic @@ -127,7 +129,6 @@ private void retryHandle(DownStreamMsgContext downStreamMsgContext) { rechoosen = downStreamMsgContext.session; } - if (rechoosen == null) { logger.warn("retry, found no session to downstream msg,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java index c903742cdd..2d4b80dafd 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java @@ -85,6 +85,7 @@ public void operationComplete(ChannelFuture future) throws Exception { long sendTime = System.currentTimeMillis(); addTimestamp(eventMeshMessage, cmd, sendTime); if (cmd.equals(Command.REQUEST_TO_SERVER)) { + //Message Attach SYNC eventMeshMessage.getProperties().put(EventMeshConstants.PROPERTY_MESSAGE_REPLY_TO, session.getClientGroupWrapper() .get().getMqProducerWrapper().getMeshMQProducer().buildMQClientId()); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java index e26c66744d..ccacf56b2d 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java @@ -22,13 +22,11 @@ import io.netty.channel.ChannelHandlerContext; -import org.apache.eventmesh.common.protocol.tcp.Command; -import org.apache.eventmesh.common.protocol.tcp.Header; -import org.apache.eventmesh.common.protocol.tcp.OPStatus; -import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.tcp.Subscription; +import org.apache.eventmesh.common.protocol.SubscriptionItem; +import org.apache.eventmesh.common.protocol.tcp.*; +import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; -import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.runtime.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,17 +49,14 @@ public void run() { throw new Exception("subscriptionInfo is null"); } - List topicList = new ArrayList<>(); + List subscriptionItems = new ArrayList<>(); for (int i = 0; i < subscriptionInfo.getTopicList().size(); i++) { - String topic = subscriptionInfo.getTopicList().get(i); - if (!EventMeshUtil.isValidRMBTopic(topic)) { - throw new Exception("invalid topic!"); - } - topicList.add(topic); + SubscriptionItem item = subscriptionInfo.getTopicList().get(i); + subscriptionItems.add(item); } synchronized (session) { - session.subscribe(topicList); - messageLogger.info("SubscribeTask succeed|user={}|topics={}", session.getClient(), topicList); + session.subscribe(subscriptionItems); + messageLogger.info("SubscribeTask succeed|user={}|topics={}", session.getClient(), subscriptionItems); } msg.setHeader(new Header(Command.SUBSCRIBE_RESPONSE, OPStatus.SUCCESS.getCode(), OPStatus.SUCCESS.getDesc(), pkg.getHeader() .getSeq())); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java index 3a338edcd6..c35245adad 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java @@ -19,10 +19,12 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import io.netty.channel.ChannelHandlerContext; import org.apache.commons.collections4.MapUtils; +import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.Header; import org.apache.eventmesh.common.protocol.tcp.OPStatus; @@ -46,10 +48,10 @@ public void run() { Package msg = new Package(); try { synchronized (session) { - List topics = new ArrayList(); + List topics = new ArrayList(); if (MapUtils.isNotEmpty(session.getSessionContext().subscribeTopics)) { - for (String topic : session.getSessionContext().subscribeTopics.keySet()) { - topics.add(topic); + for (Map.Entry entry : session.getSessionContext().subscribeTopics.entrySet()) { + topics.add(entry.getValue()); } session.unsubscribe(topics); messageLogger.info("UnSubscriberTask succeed|user={}|topics={}", session.getClient(), topics); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java index 3814fb7de5..0ecb375f1c 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java @@ -115,64 +115,6 @@ public static String buildCCAddr(String str, String idc) { return str + "/namesrvAddr/" + idc; } - public static boolean isValidRMBTopic(String topic) { - if (StringUtils.isEmpty(topic) || !StringUtils.contains(topic, "-")) { - return false; - } - - String[] args = StringUtils.split(topic, "-"); - if (ArrayUtils.getLength(args) != 5) { - return false; - } - - String s0e = args[1]; - if (!StringUtils.equalsIgnoreCase("s", s0e) && !StringUtils.equalsIgnoreCase("e", s0e)) { - return false; - } - - String service = args[2]; - if (!StringUtils.isNumeric(service)) { - return false; - } - - return true; - } - - public static String getServiceIDStr(String topic) { - if (!isValidRMBTopic(topic)) { - return ""; - } - - String[] args = StringUtils.split(topic, "-"); - return args[2]; - } - - public static String getPidStr(String topic) { - if (!isValidRMBTopic(topic)) { - return ""; - } - - String[] args = StringUtils.split(topic, "-"); - return args[3]; - } - - public static boolean isService(String topic) { - String serviceStr = getServiceIDStr(topic); - if (StringUtils.isEmpty(serviceStr)) { - return false; - } - return "0".equals(StringUtils.substring(serviceStr, 3, 4)); - } - - public static boolean isBroadcast(String topic) { - String serviceStr = getServiceIDStr(topic); - if (StringUtils.isEmpty(serviceStr)) { - return false; - } - String[] args = StringUtils.split(topic, "-"); - return "3".equals(StringUtils.substring(args[2], 3, 4)) || "4".equals(StringUtils.substring(args[2], 3, 4)); - } - /** * 自定义取堆栈 * diff --git a/eventmesh-runtime/src/test/java/client/EventMeshClient.java b/eventmesh-runtime/src/test/java/client/EventMeshClient.java index ce4a13ed4a..547bf61f6b 100644 --- a/eventmesh-runtime/src/test/java/client/EventMeshClient.java +++ b/eventmesh-runtime/src/test/java/client/EventMeshClient.java @@ -17,9 +17,11 @@ package client; +import org.apache.eventmesh.common.protocol.SubcriptionType; import org.apache.eventmesh.common.protocol.tcp.Package; import client.hook.ReceiveMsgHook; +import org.apache.eventmesh.common.protocol.SubscriptionMode; public interface EventMeshClient { @@ -37,13 +39,9 @@ public interface EventMeshClient { Package listen() throws Exception; - Package justSubscribe(String serviceId, String scenario, String dcn) throws Exception; + Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception; - Package justUnsubscribe(String serviceId, String scenario, String dcn) throws Exception; - - Package justSubscribe(String topic) throws Exception; - - Package justUnsubscribe(String topic) throws Exception; + Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception; void registerPubBusiHandler(ReceiveMsgHook handler) throws Exception; diff --git a/eventmesh-runtime/src/test/java/client/SubClient.java b/eventmesh-runtime/src/test/java/client/SubClient.java index dff635ac0d..e591ea67e3 100644 --- a/eventmesh-runtime/src/test/java/client/SubClient.java +++ b/eventmesh-runtime/src/test/java/client/SubClient.java @@ -17,7 +17,9 @@ package client; +import org.apache.eventmesh.common.protocol.SubcriptionType; import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import client.hook.ReceiveMsgHook; @@ -32,13 +34,9 @@ public interface SubClient { void reconnect() throws Exception; - Package justSubscribe(String serviceId, String scenario, String dcn) throws Exception; + Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception; - Package justSubscribe(String topic) throws Exception; - - Package justUnsubscribe(String serviceId, String scenario, String dcn) throws Exception; - - Package justUnsubscribe(String topic) throws Exception; + Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception; Package listen() throws Exception; diff --git a/eventmesh-runtime/src/test/java/client/common/MessageUtils.java b/eventmesh-runtime/src/test/java/client/common/MessageUtils.java index 0eb313ed3e..c1cc9846df 100644 --- a/eventmesh-runtime/src/test/java/client/common/MessageUtils.java +++ b/eventmesh-runtime/src/test/java/client/common/MessageUtils.java @@ -21,12 +21,12 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; -import org.apache.eventmesh.common.protocol.tcp.Command; -import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; -import org.apache.eventmesh.common.protocol.tcp.Header; -import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.common.protocol.SubcriptionType; import org.apache.eventmesh.common.protocol.tcp.Subscription; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import org.apache.eventmesh.common.protocol.SubscriptionItem; +import org.apache.eventmesh.common.protocol.SubscriptionMode; +import org.apache.eventmesh.common.protocol.tcp.*; +import org.apache.eventmesh.common.protocol.tcp.Package; public class MessageUtils { public static int seqLength = 10; @@ -63,10 +63,10 @@ public static Package subscribe() { return msg; } - public static Package subscribe(String topic) { + public static Package subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) { Package msg = new Package(); msg.setHeader(new Header(Command.SUBSCRIBE_REQUEST, 0, null, generateRandomString(seqLength))); - msg.setBody(generateSubscription(topic)); + msg.setBody(generateSubscription(topic, subscriptionMode, subcriptionType)); return msg; } @@ -76,10 +76,10 @@ public static Package unsubscribe() { return msg; } - public static Package unsubscribe(String topic) { + public static Package unsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) { Package msg = new Package(); msg.setHeader(new Header(Command.UNSUBSCRIBE_REQUEST, 0, null, generateRandomString(seqLength))); - msg.setBody(generateSubscription(topic)); + msg.setBody(generateSubscription(topic, subscriptionMode, subcriptionType)); return msg; } @@ -170,20 +170,20 @@ public static UserAgent generateSubServer() { public static Subscription generateSubscription() { Subscription subscription = new Subscription(); - List topicList = new ArrayList<>(); - topicList.add("FT0-s-80000000-01-0"); - topicList.add("FT0-s-80000000-02-0"); - topicList.add("FT0-s-80000000-03-0"); - topicList.add("FT0-s-80000000-04-0"); - subscription.setTopicList(topicList); + List subscriptionItems = new ArrayList<>(); + subscriptionItems.add(new SubscriptionItem("FT0-s-80000000-01-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC)); + subscriptionItems.add(new SubscriptionItem("FT0-s-80000000-02-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC)); + subscriptionItems.add(new SubscriptionItem("FT0-s-80000000-03-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC)); + subscriptionItems.add(new SubscriptionItem("FT0-s-80000000-04-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC)); + subscription.setTopicList(subscriptionItems); return subscription; } - public static Subscription generateSubscription(String topic) { + public static Subscription generateSubscription(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) { Subscription subscription = new Subscription(); - List topicList = new ArrayList<>(); - topicList.add(topic); - subscription.setTopicList(topicList); + List subscriptionItems = new ArrayList<>(); + subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType)); + subscription.setTopicList(subscriptionItems); return subscription; } diff --git a/eventmesh-runtime/src/test/java/client/impl/EventMeshClientImpl.java b/eventmesh-runtime/src/test/java/client/impl/EventMeshClientImpl.java index 5f58c1e54f..3913bcdd01 100644 --- a/eventmesh-runtime/src/test/java/client/impl/EventMeshClientImpl.java +++ b/eventmesh-runtime/src/test/java/client/impl/EventMeshClientImpl.java @@ -17,7 +17,9 @@ package client.impl; +import org.apache.eventmesh.common.protocol.SubcriptionType; import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import client.EventMeshClient; @@ -76,26 +78,19 @@ public void heartbeat() throws Exception { this.subClient.heartbeat(); } - public Package justSubscribe(String serviceId, String scenario, String dcn) throws Exception { - return this.subClient.justSubscribe(serviceId, scenario, dcn); - } public Package listen() throws Exception { return this.subClient.listen(); } - public Package justUnsubscribe(String serviceId, String scenario, String dcn) throws Exception { - return this.subClient.justUnsubscribe(serviceId, scenario, dcn); - } - @Override - public Package justSubscribe(String topic) throws Exception { - return this.subClient.justSubscribe(topic); + public Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception { + return this.subClient.justSubscribe(topic, subscriptionMode, subcriptionType); } @Override - public Package justUnsubscribe(String topic) throws Exception { - return this.subClient.justUnsubscribe(topic); + public Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception { + return this.subClient.justUnsubscribe(topic, subscriptionMode, subcriptionType); } diff --git a/eventmesh-runtime/src/test/java/client/impl/SubClientImpl.java b/eventmesh-runtime/src/test/java/client/impl/SubClientImpl.java index 3a61b48683..e77171558c 100644 --- a/eventmesh-runtime/src/test/java/client/impl/SubClientImpl.java +++ b/eventmesh-runtime/src/test/java/client/impl/SubClientImpl.java @@ -27,10 +27,11 @@ import io.netty.channel.SimpleChannelInboundHandler; import org.apache.commons.collections4.CollectionUtils; -import org.apache.eventmesh.common.protocol.tcp.Command; -import org.apache.eventmesh.common.protocol.tcp.OPStatus; +import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionItem; +import org.apache.eventmesh.common.protocol.SubscriptionMode; +import org.apache.eventmesh.common.protocol.tcp.*; import org.apache.eventmesh.common.protocol.tcp.Package; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +51,7 @@ public class SubClientImpl extends TCPClient implements SubClient { private ReceiveMsgHook callback; - private List topics = new ArrayList(); + private List subscriptionItems = new ArrayList(); private ScheduledFuture task; @@ -72,9 +73,9 @@ public void init() throws Exception { public void reconnect() throws Exception { super.reconnect(); hello(); - if (!CollectionUtils.isEmpty(topics)) { - for (String topic : topics) { - Package request = MessageUtils.subscribe(topic); + if (!CollectionUtils.isEmpty(subscriptionItems)) { + for (SubscriptionItem item : subscriptionItems) { + Package request = MessageUtils.subscribe(item.getTopic(), item.getMode(), item.getType()); this.dispatcher(request, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS); } } @@ -117,14 +118,9 @@ private void hello() throws Exception { this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS); } - public Package justSubscribe(String serviceId, String scenario, String dcn) throws Exception { - Package msg = MessageUtils.subscribe(); - return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS); - } - - public Package justSubscribe(String topic) throws Exception { - topics.add(topic); - Package msg = MessageUtils.subscribe(topic); + public Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception { + subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType)); + Package msg = MessageUtils.subscribe(topic, subscriptionMode, subcriptionType); return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS); } @@ -144,14 +140,9 @@ public Package listen() throws Exception { // this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS); // } - public Package justUnsubscribe(String topic) throws Exception { - topics.remove(topic); - Package msg = MessageUtils.unsubscribe(topic); - return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS); - } - - public Package justUnsubscribe(String serviceId, String scenario, String dcn) throws Exception { - Package msg = MessageUtils.unsubscribe(); + public Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception { + subscriptionItems.remove(topic); + Package msg = MessageUtils.unsubscribe(topic, subscriptionMode, subcriptionType); return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS); } diff --git a/eventmesh-runtime/src/test/java/demo/AsyncSubClient.java b/eventmesh-runtime/src/test/java/demo/AsyncSubClient.java index eb55d0ec7a..fc479ca75e 100644 --- a/eventmesh-runtime/src/test/java/demo/AsyncSubClient.java +++ b/eventmesh-runtime/src/test/java/demo/AsyncSubClient.java @@ -19,6 +19,7 @@ import io.netty.channel.ChannelHandlerContext; +import org.apache.eventmesh.common.protocol.SubcriptionType; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.Package; @@ -26,13 +27,14 @@ import client.common.MessageUtils; import client.hook.ReceiveMsgHook; import client.impl.SubClientImpl; +import org.apache.eventmesh.common.protocol.SubscriptionMode; public class AsyncSubClient { public static void main(String[] args) throws Exception { SubClientImpl client = new SubClientImpl("127.0.0.1", 10002, MessageUtils.generateSubServer()); client.init(); client.heartbeat(); - client.justSubscribe(ClientConstants.ASYNC_TOPIC); + client.justSubscribe(ClientConstants.ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC); client.registerBusiHandler(new ReceiveMsgHook() { @Override public void handle(Package msg, ChannelHandlerContext ctx) { diff --git a/eventmesh-runtime/src/test/java/demo/BroadCastSubClient.java b/eventmesh-runtime/src/test/java/demo/BroadCastSubClient.java index f3f29f666a..3d9385d750 100644 --- a/eventmesh-runtime/src/test/java/demo/BroadCastSubClient.java +++ b/eventmesh-runtime/src/test/java/demo/BroadCastSubClient.java @@ -19,6 +19,7 @@ import io.netty.channel.ChannelHandlerContext; +import org.apache.eventmesh.common.protocol.SubcriptionType; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.Package; @@ -27,13 +28,14 @@ import client.common.MessageUtils; import client.hook.ReceiveMsgHook; import client.impl.SubClientImpl; +import org.apache.eventmesh.common.protocol.SubscriptionMode; public class BroadCastSubClient { public static void main(String[] args) throws Exception { SubClientImpl client = new SubClientImpl("127.0.0.1", 10000, MessageUtils.generateSubServer()); client.init(); client.heartbeat(); - client.justSubscribe(ClientConstants.BROADCAST_TOPIC); + client.justSubscribe(ClientConstants.BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC); client.registerBusiHandler(new ReceiveMsgHook() { @Override public void handle(Package msg, ChannelHandlerContext ctx) { diff --git a/eventmesh-runtime/src/test/java/demo/CCSubClient.java b/eventmesh-runtime/src/test/java/demo/CCSubClient.java index d415743cf3..935a554878 100644 --- a/eventmesh-runtime/src/test/java/demo/CCSubClient.java +++ b/eventmesh-runtime/src/test/java/demo/CCSubClient.java @@ -19,6 +19,7 @@ import io.netty.channel.ChannelHandlerContext; +import org.apache.eventmesh.common.protocol.SubcriptionType; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.Package; @@ -26,6 +27,7 @@ import client.common.UserAgentUtils; import client.hook.ReceiveMsgHook; import client.impl.SubClientImpl; +import org.apache.eventmesh.common.protocol.SubscriptionMode; public class CCSubClient { @@ -34,7 +36,7 @@ public static void main(String[] args) throws Exception { subClient.init(); subClient.heartbeat(); subClient.listen(); - subClient.justSubscribe("FT0-s-80000000-01-0"); + subClient.justSubscribe("FT0-s-80000000-01-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC); subClient.registerBusiHandler(new ReceiveMsgHook() { @Override public void handle(Package msg, ChannelHandlerContext ctx) { diff --git a/eventmesh-runtime/src/test/java/demo/CClientDemo.java b/eventmesh-runtime/src/test/java/demo/CClientDemo.java index 590aa3fe2c..61c4f52815 100644 --- a/eventmesh-runtime/src/test/java/demo/CClientDemo.java +++ b/eventmesh-runtime/src/test/java/demo/CClientDemo.java @@ -19,8 +19,10 @@ import io.netty.channel.ChannelHandlerContext; +import org.apache.eventmesh.common.protocol.SubcriptionType; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,8 +46,8 @@ public static void main(String[] args) throws Exception { EventMeshClientImpl client = new EventMeshClientImpl("127.0.0.1", 10000); client.init(); client.heartbeat(); - client.justSubscribe(ASYNC_TOPIC); - client.justSubscribe(BROADCAST_TOPIC); + client.justSubscribe(ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC); + client.justSubscribe(BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC); client.listen(); // for (int i = 0; i < 10000; i++) { // Package rr = null; diff --git a/eventmesh-runtime/src/test/java/demo/SyncSubClient.java b/eventmesh-runtime/src/test/java/demo/SyncSubClient.java index 10e13c4ae8..9bc11415a3 100644 --- a/eventmesh-runtime/src/test/java/demo/SyncSubClient.java +++ b/eventmesh-runtime/src/test/java/demo/SyncSubClient.java @@ -19,6 +19,7 @@ import io.netty.channel.ChannelHandlerContext; +import org.apache.eventmesh.common.protocol.SubcriptionType; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.Package; @@ -26,13 +27,14 @@ import client.common.MessageUtils; import client.hook.ReceiveMsgHook; import client.impl.SubClientImpl; +import org.apache.eventmesh.common.protocol.SubscriptionMode; public class SyncSubClient { public static void main(String[] args) throws Exception { SubClientImpl client = new SubClientImpl("127.0.0.1", 10000, MessageUtils.generateSubServer()); client.init(); client.heartbeat(); - client.justSubscribe(ClientConstants.SYNC_TOPIC); + client.justSubscribe(ClientConstants.SYNC_TOPIC, SubscriptionMode.CLUSTERING, SubcriptionType.SYNC); client.registerBusiHandler(new ReceiveMsgHook() { @Override public void handle(Package msg, ChannelHandlerContext ctx) { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java index ede5ef0743..55b7e1647e 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java @@ -17,8 +17,7 @@ package org.apache.eventmesh.client.http.consumer; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -45,6 +44,7 @@ import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.EventMeshException; import org.apache.eventmesh.common.ThreadPoolFactory; +import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatRequestBody; import org.apache.eventmesh.common.protocol.http.body.client.SubscribeRequestBody; import org.apache.eventmesh.common.protocol.http.common.ClientType; @@ -52,6 +52,7 @@ import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion; import org.apache.eventmesh.common.protocol.http.common.RequestCode; +import org.apache.eventmesh.common.protocol.tcp.Subscription; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.slf4j.Logger; @@ -67,7 +68,7 @@ public class LiteConsumer extends AbstractLiteClient { protected LiteClientConfig eventMeshClientConfig; - private List subscription = Lists.newArrayList(); + private List subscription = Lists.newArrayList(); private LiteMessageListener messageListener; @@ -116,7 +117,7 @@ public void shutdown() throws Exception { logger.info("LiteConsumer shutdown"); } - public boolean subscribe(List topicList, String url) throws Exception { + public boolean subscribe(List topicList, String url) throws Exception { subscription.addAll(topicList); if (!started.get()) { start(); @@ -146,7 +147,7 @@ public boolean subscribe(List topicList, String url) throws Exception { } - private RequestParam generateSubscribeRequestParam(List topicList, String url) { + private RequestParam generateSubscribeRequestParam(List topicList, String url) { // final LiteMessage liteMessage = new LiteMessage(); // liteMessage.setBizSeqNo(RandomStringUtils.randomNumeric(30)) // .setContent("subscribe message") @@ -170,11 +171,11 @@ private RequestParam generateSubscribeRequestParam(List topicList, Strin return requestParam; } - private RequestParam generateHeartBeatRequestParam(List topics, String url) { + private RequestParam generateHeartBeatRequestParam(List topics, String url) { List heartbeatEntities = new ArrayList<>(); - for (String topic : topics) { + for (SubscriptionItem item : topics) { HeartbeatRequestBody.HeartbeatEntity heartbeatEntity = new HeartbeatRequestBody.HeartbeatEntity(); - heartbeatEntity.topic = topic; + heartbeatEntity.topic = item.getTopic(); heartbeatEntity.url = url; heartbeatEntities.add(heartbeatEntity); } @@ -198,7 +199,7 @@ private RequestParam generateHeartBeatRequestParam(List topics, String u return requestParam; } - public void heartBeat(List topicList, String url) throws Exception { + public void heartBeat(List topicList, String url) throws Exception { scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { @@ -234,7 +235,15 @@ public void run() { } public boolean unsubscribe(List topicList, String url) throws Exception { - subscription.removeAll(topicList); + Set unSub = new HashSet<>(topicList); + Iterator itr = subscription.iterator(); + while(itr.hasNext()) { + SubscriptionItem item = itr.next(); + if (unSub.contains(item.getTopic())) { + itr.remove(); + } + } + RequestParam unSubscribeParam = generateUnSubscribeRequestParam(topicList, url); long startTime = System.currentTimeMillis(); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java index 4e11260528..afdb90ac5f 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java @@ -19,7 +19,9 @@ import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; +import org.apache.eventmesh.common.protocol.SubcriptionType; import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.common.protocol.SubscriptionMode; public interface EventMeshClient { @@ -39,7 +41,7 @@ public interface EventMeshClient { void listen() throws Exception; - void subscribe(String topic) throws Exception; + void subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception; void unsubscribe() throws Exception; diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimpleSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimpleSubClient.java index 251fd93b27..3803edcb07 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimpleSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimpleSubClient.java @@ -19,6 +19,8 @@ import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; +import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.UserAgent; public interface SimpleSubClient { @@ -30,7 +32,7 @@ public interface SimpleSubClient { void reconnect() throws Exception; - void subscribe(String topic) throws Exception; + void subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception; void unsubscribe() throws Exception; diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java index 4122a234a0..87ef68a743 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java @@ -21,10 +21,11 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; -import org.apache.eventmesh.common.protocol.tcp.Command; -import org.apache.eventmesh.common.protocol.tcp.Header; +import org.apache.eventmesh.common.protocol.SubcriptionType; import org.apache.eventmesh.common.protocol.tcp.Subscription; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import org.apache.eventmesh.common.protocol.SubscriptionItem; +import org.apache.eventmesh.common.protocol.SubscriptionMode; +import org.apache.eventmesh.common.protocol.tcp.*; import org.apache.eventmesh.common.protocol.tcp.Package; public class MessageUtils { @@ -55,10 +56,10 @@ public static Package listen() { return msg; } - public static Package subscribe(String topic) { + public static Package subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) { Package msg = new Package(); msg.setHeader(new Header(Command.SUBSCRIBE_REQUEST, 0, null, generateRandomString(seqLength))); - msg.setBody(generateSubscription(topic)); + msg.setBody(generateSubscription(topic, subscriptionMode, subcriptionType)); return msg; } @@ -130,11 +131,11 @@ public static UserAgent generatePubClient(UserAgent agent) { return user; } - private static Subscription generateSubscription(String topic) { + private static Subscription generateSubscription(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) { Subscription subscription = new Subscription(); - List topicList = new ArrayList<>(); - topicList.add(topic); - subscription.setTopicList(topicList); + List subscriptionItems = new ArrayList<>(); + subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType)); + subscription.setTopicList(subscriptionItems); return subscription; } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java index 4cc14eb46a..8e645fa35f 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java @@ -24,7 +24,9 @@ import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; import org.apache.eventmesh.client.tcp.common.MessageUtils; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; +import org.apache.eventmesh.common.protocol.SubcriptionType; import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.UserAgent; public class DefaultEventMeshClient implements EventMeshClient { @@ -95,8 +97,8 @@ public void listen() throws Exception { } @Override - public void subscribe(String topic) throws Exception { - this.subClient.subscribe(topic); + public void subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception { + this.subClient.subscribe(topic, subscriptionMode, subcriptionType); } @Override diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java index 0579184156..38d52f653d 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java @@ -33,10 +33,12 @@ import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.common.RequestContext; import org.apache.eventmesh.client.tcp.common.TcpClient; -import org.apache.eventmesh.common.protocol.tcp.Command; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; -import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionItem; +import org.apache.eventmesh.common.protocol.SubscriptionMode; +import org.apache.eventmesh.common.protocol.tcp.*; +import org.apache.eventmesh.common.protocol.tcp.Package; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +50,7 @@ public class SimpleSubClientImpl extends TcpClient implements SimpleSubClient { private ReceiveMsgHook callback; - private List topics = new ArrayList(); + private List subscriptionItems = new ArrayList(); private ScheduledFuture task; @@ -70,9 +72,9 @@ public void init() throws Exception { public void reconnect() throws Exception { super.reconnect(); hello(); - if (!CollectionUtils.isEmpty(topics)) { - for (String topic : topics) { - Package request = MessageUtils.subscribe(topic); + if (!CollectionUtils.isEmpty(subscriptionItems)) { + for (SubscriptionItem item : subscriptionItems) { + Package request = MessageUtils.subscribe(item.getTopic(), item.getMode(), item.getType()); this.io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); } } @@ -121,9 +123,9 @@ public void listen() throws Exception { } - public void subscribe(String topic) throws Exception { - topics.add(topic); - Package request = MessageUtils.subscribe(topic); + public void subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception { + subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType)); + Package request = MessageUtils.subscribe(topic, subscriptionMode, subcriptionType); this.io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); } diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java index 319e428772..12da8465bf 100644 --- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java +++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java @@ -24,6 +24,8 @@ import org.apache.eventmesh.client.tcp.common.EventMeshTestUtils; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient; +import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.common.protocol.tcp.Package; @@ -45,7 +47,7 @@ public static void main(String[] agrs) throws Exception { client.init(); client.heartbeat(); - client.subscribe("FT0-e-80010000-01-1"); + client.subscribe("FT0-e-80010000-01-1", SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC); client.registerSubBusiHandler(handler); client.listen(); diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java index 8b77e1a3a9..e21ee450be 100644 --- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java +++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java @@ -24,6 +24,8 @@ import org.apache.eventmesh.client.tcp.common.EventMeshTestUtils; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient; +import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.common.protocol.tcp.Package; @@ -45,7 +47,7 @@ public static void main(String[] agrs) throws Exception { client.init(); client.heartbeat(); - client.subscribe("FT0-e-80030001-01-3"); + client.subscribe("FT0-e-80030001-01-3", SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC); client.registerSubBusiHandler(handler); client.listen(); diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java index b4fbbf71af..756150a5ac 100644 --- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java +++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java @@ -23,6 +23,8 @@ import org.apache.eventmesh.client.tcp.common.EventMeshTestUtils; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient; +import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.common.protocol.tcp.Package; import org.slf4j.Logger; @@ -43,7 +45,7 @@ public static void main(String[] agrs) throws Exception { client.init(); client.heartbeat(); - client.subscribe("FT0-s-80000000-01-0"); + client.subscribe("FT0-s-80000000-01-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC); //同步RR消息 client.registerSubBusiHandler(handler); diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index ed876884fb..3fcfa3f73a 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -19,6 +19,7 @@ package org.apache.eventmesh.http.demo.sub.service; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; @@ -29,6 +30,9 @@ import org.apache.eventmesh.common.EventMeshException; import org.apache.eventmesh.common.IPUtil; import org.apache.eventmesh.common.ThreadUtil; +import org.apache.eventmesh.common.protocol.SubcriptionType; +import org.apache.eventmesh.common.protocol.SubscriptionItem; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.http.demo.AsyncPublishInstance; import org.apache.eventmesh.util.Utils; import org.slf4j.Logger; @@ -48,7 +52,7 @@ public class SubService implements InitializingBean { final Properties properties = Utils.readPropertiesFile("application.properties"); - final List topicList = Arrays.asList("FT0-e-80010001-01-1"); + final List topicList = Arrays.asList(new SubscriptionItem("FT0-e-80010001-01-1", SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC)); final String localIp = IPUtil.getLocalAddress(); final String localPort = properties.getProperty("server.port"); final String eventMeshIp = properties.getProperty("eventmesh.ip"); @@ -100,7 +104,11 @@ public void afterPropertiesSet() throws Exception { public void cleanup() { logger.info("start destory ...."); try { - liteConsumer.unsubscribe(topicList, url); + List unSubList = new ArrayList<>(); + for (SubscriptionItem item:topicList) { + unSubList.add(item.getTopic()); + } + liteConsumer.unsubscribe(unSubList, url); } catch (Exception e) { e.printStackTrace(); } diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java index 1f7d3c89b3..4cde0ee804 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java @@ -24,8 +24,10 @@ import org.apache.eventmesh.client.tcp.EventMeshClient; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient; +import org.apache.eventmesh.common.protocol.SubcriptionType; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.tcp.common.EventMeshTestUtils; import org.apache.eventmesh.util.Utils; @@ -50,7 +52,7 @@ public static void main(String[] agrs) throws Exception { client.init(); client.heartbeat(); - client.subscribe("FT0-e-80010000-01-1"); + client.subscribe("FT0-e-80010000-01-1", SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC); client.registerSubBusiHandler(handler); client.listen(); diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java index 74bdaaa212..cf2323a2a4 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java @@ -24,8 +24,10 @@ import org.apache.eventmesh.client.tcp.EventMeshClient; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient; +import org.apache.eventmesh.common.protocol.SubcriptionType; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.tcp.common.EventMeshTestUtils; import org.apache.eventmesh.util.Utils; @@ -50,7 +52,7 @@ public static void main(String[] agrs) throws Exception { client.init(); client.heartbeat(); - client.subscribe("FT0-e-80030000-01-3"); + client.subscribe("FT0-e-80030000-01-3", SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC); client.registerSubBusiHandler(handler); client.listen(); diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java index 969163d44b..c4cb72e305 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java @@ -22,7 +22,9 @@ import org.apache.eventmesh.client.tcp.EventMeshClient; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient; +import org.apache.eventmesh.common.protocol.SubcriptionType; import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.tcp.common.EventMeshTestUtils; import org.slf4j.Logger; @@ -43,7 +45,7 @@ public static void main(String[] agrs) throws Exception { client.init(); client.heartbeat(); - client.subscribe("FT0-s-80000000-01-0"); + client.subscribe("FT0-s-80000000-01-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC); //同步RR消息 client.registerSubBusiHandler(handler);