diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java index 467e4f0cb5..53e8a3c3bd 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java @@ -156,19 +156,19 @@ public void run() { public void notifyConsumerManager(String consumerGroup, ConsumerGroupConf latestConsumerGroupConfig, ConcurrentHashMap localConsumerGroupMapping) throws Exception { ConsumerGroupManager cgm = eventMeshHTTPServer.getConsumerManager().getConsumer(consumerGroup); - if (cgm == null) { + if (latestConsumerGroupConfig == null) { ConsumerGroupStateEvent notification = new ConsumerGroupStateEvent(); - notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.NEW; + notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.DELETE; notification.consumerGroup = consumerGroup; - notification.consumerGroupConfig = latestConsumerGroupConfig; eventMeshHTTPServer.getEventBus().post(notification); return; } - if (latestConsumerGroupConfig == null) { + if (cgm == null) { ConsumerGroupStateEvent notification = new ConsumerGroupStateEvent(); - notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.DELETE; + notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.NEW; notification.consumerGroup = consumerGroup; + notification.consumerGroupConfig = latestConsumerGroupConfig; eventMeshHTTPServer.getEventBus().post(notification); return; } @@ -217,8 +217,10 @@ public synchronized void addConsumer(String consumerGroup, ConsumerGroupConf con * restart consumer */ public synchronized void restartConsumer(String consumerGroup, ConsumerGroupConf consumerGroupConfig) throws Exception { - ConsumerGroupManager cgm = consumerTable.get(consumerGroup); - cgm.refresh(consumerGroupConfig); + if(consumerTable.containsKey(consumerGroup)) { + ConsumerGroupManager cgm = consumerTable.get(consumerGroup); + cgm.refresh(consumerGroupConfig); + } } /** @@ -235,8 +237,10 @@ public ConsumerGroupManager getConsumer(String consumerGroup) throws Exception { * @param consumerGroup */ public synchronized void delConsumer(String consumerGroup) throws Exception { - ConsumerGroupManager cgm = consumerTable.remove(consumerGroup); - cgm.shutdown(); + if(consumerTable.containsKey(consumerGroup)) { + ConsumerGroupManager cgm = consumerTable.remove(consumerGroup); + cgm.shutdown(); + } } @Subscribe diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java index 54fa73edde..1d0b6e89fe 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java @@ -19,15 +19,13 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; - import com.alibaba.fastjson.JSONObject; - import io.netty.channel.ChannelHandlerContext; - import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.IPUtil; @@ -108,6 +106,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext synchronized (eventMeshHTTPServer.localClientInfoMapping) { + registerClient(subscribeRequestHeader, consumerGroup, subTopicList, url); for (String subTopic : subTopicList) { List groupTopicClients = eventMeshHTTPServer.localClientInfoMapping.get(consumerGroup + "@" + subTopic); @@ -206,4 +205,41 @@ public boolean rejectRequest() { return false; } + private void registerClient(SubscribeRequestHeader subscribeRequestHeader, String consumerGroup, + List topicList, String url) { + for(String topic: topicList) { + Client client = new Client(); + client.env = subscribeRequestHeader.getEnv(); + client.dcn = subscribeRequestHeader.getDcn(); + client.idc = subscribeRequestHeader.getIdc(); + client.sys = subscribeRequestHeader.getSys(); + client.ip = subscribeRequestHeader.getIp(); + client.pid = subscribeRequestHeader.getPid(); + client.consumerGroup = consumerGroup; + client.topic = topic; + client.url = url; + client.lastUpTime = new Date(); + + String groupTopicKey = client.consumerGroup + "@" + client.topic; + + if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) { + List localClients = eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey); + boolean isContains = false; + for (Client localClient : localClients) { + if (StringUtils.equals(localClient.url, client.url)) { + isContains = true; + localClient.lastUpTime = client.lastUpTime; + break; + } + } + if (!isContains) { + localClients.add(client); + } + } else { + List clients = new ArrayList<>(); + clients.add(client); + eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, clients); + } + } + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java index e8707df9d2..4d9f2e25fa 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java @@ -18,18 +18,15 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - import com.alibaba.fastjson.JSONObject; - import io.netty.channel.ChannelHandlerContext; - import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.IPUtil; @@ -44,10 +41,8 @@ import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf; import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf; -import org.apache.eventmesh.runtime.core.consumergroup.event.ConsumerGroupStateEvent; import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler; -import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerGroupManager; import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client; import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor; import org.apache.eventmesh.runtime.util.EventMeshUtil; @@ -131,12 +126,15 @@ public void onResponse(HttpCommand httpCommand) { synchronized (eventMeshHTTPServer.localClientInfoMapping) { boolean isChange = true; + + registerClient(unSubscribeRequestHeader, consumerGroup, unSubTopicList, unSubscribeUrl); + for (String unSubTopic : unSubTopicList) { List groupTopicClients = eventMeshHTTPServer.localClientInfoMapping.get(consumerGroup + "@" + unSubTopic); Iterator clientIterator = groupTopicClients.iterator(); while (clientIterator.hasNext()) { Client client = clientIterator.next(); - if (StringUtils.equals(client.ip, ip)) { + if (StringUtils.equals(client.pid, pid) && StringUtils.equals(client.url, unSubscribeUrl)) { httpLogger.warn("client {} start unsubscribe", JSONObject.toJSONString(client)); clientIterator.remove(); } @@ -239,4 +237,41 @@ public void onResponse(HttpCommand httpCommand) { public boolean rejectRequest() { return false; } + + private void registerClient(UnSubscribeRequestHeader unSubscribeRequestHeader, String consumerGroup, + List topicList, String url) { + for(String topic: topicList) { + Client client = new Client(); + client.env = unSubscribeRequestHeader.getEnv(); + client.dcn = unSubscribeRequestHeader.getDcn(); + client.idc = unSubscribeRequestHeader.getIdc(); + client.sys = unSubscribeRequestHeader.getSys(); + client.ip = unSubscribeRequestHeader.getIp(); + client.pid = unSubscribeRequestHeader.getPid(); + client.consumerGroup = consumerGroup; + client.topic = topic; + client.url = url; + client.lastUpTime = new Date(); + + String groupTopicKey = client.consumerGroup + "@" + client.topic; + if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) { + List localClients = eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey); + boolean isContains = false; + for (Client localClient : localClients) { + if (StringUtils.equals(localClient.url, client.url)) { + isContains = true; + localClient.lastUpTime = client.lastUpTime; + break; + } + } + if (!isContains) { + localClients.add(client); + } + } else { + List clients = new ArrayList<>(); + clients.add(client); + eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, clients); + } + } + } } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java index 801481a418..0d14852f54 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java @@ -121,15 +121,12 @@ public boolean subscribe(List topicList, String url) throws Exception { start(); } - RequestParam heartBeatParam = generateHeartBeatRequestParam(topicList, url); RequestParam subscribeParam = generateSubscribeRequestParam(topicList, url); long startTime = System.currentTimeMillis(); String target = selectEventMesh(); String subRes = ""; - String heartRes = ""; try { - heartRes = HttpUtil.post(httpClient, target, heartBeatParam); subRes = HttpUtil.post(httpClient, target, subscribeParam); } catch (Exception ex) { throw new EventMeshException(ex); @@ -239,15 +236,12 @@ public void run() { public boolean unsubscribe(List topicList, String url) throws EventMeshException { subscription.removeAll(topicList); - RequestParam heartBeatParam = generateHeartBeatRequestParam(topicList, url); RequestParam unSubscribeParam = generateUnSubscribeRequestParam(topicList, url); long startTime = System.currentTimeMillis(); String target = selectEventMesh(); String unSubRes = ""; - String heartRes = ""; try { - heartRes = HttpUtil.post(httpClient, target, heartBeatParam); unSubRes = HttpUtil.post(httpClient, target, unSubscribeParam); } catch (Exception ex) { throw new EventMeshException(ex);