From 50f959d6c7416da01d51da0b33e8c9b47cee0611 Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Tue, 11 May 2021 11:40:24 -0400 Subject: [PATCH] [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook --- .../http/demo/AsyncPublishInstance.java | 5 +- .../demo/sub/controller/SubController.java | 9 ++- .../http/demo/sub/service/SubService.java | 57 ++++++++++++------- 3 files changed, 50 insertions(+), 21 deletions(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java index b718bcc2e0..558773fc56 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java @@ -35,6 +35,9 @@ public class AsyncPublishInstance { public static Logger logger = LoggerFactory.getLogger(AsyncPublishInstance.class); + // This messageSize is also used in SubService.java (Subscriber) + public static int messageSize = 5; + public static void main(String[] args) throws Exception { Properties properties = Utils.readPropertiesFile("application.properties"); final String eventMeshIp = properties.getProperty("eventmesh.ip"); @@ -62,7 +65,7 @@ public static void main(String[] args) throws Exception { liteProducer = new LiteProducer(eventMeshClientConfig); liteProducer.start(); - for (int i = 0; i < 5; i++) { + for (int i = 0; i < messageSize; i++) { LiteMessage liteMessage = new LiteMessage(); liteMessage.setBizSeqNo(RandomStringUtils.randomNumeric(30)) // .setContent("contentStr with special protocal") diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java index 8f8a7a7f9b..a3b9f4ede4 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java @@ -19,8 +19,10 @@ import com.alibaba.fastjson.JSONObject; +import org.apache.eventmesh.http.demo.sub.service.SubService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -33,12 +35,17 @@ public class SubController { public static Logger logger = LoggerFactory.getLogger(SubController.class); + @Autowired + private SubService subService; + @RequestMapping(value = "/test", method = RequestMethod.POST) public String subTest(@RequestBody String message) { logger.info("=======receive message======= {}", JSONObject.toJSONString(message)); JSONObject result = new JSONObject(); result.put("retCode", 1); - return result.toJSONString(); + String strResult = result.toJSONString(); + subService.consumeMessage(strResult); + return strResult; } } diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index 84432ca003..8fb4746cef 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -3,18 +3,22 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; - +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.client.http.conf.LiteClientConfig; import org.apache.eventmesh.client.http.consumer.LiteConsumer; import org.apache.eventmesh.common.EventMeshException; import org.apache.eventmesh.common.IPUtil; import org.apache.eventmesh.common.ThreadUtil; +import org.apache.eventmesh.http.demo.AsyncPublishInstance; import org.apache.eventmesh.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; +import javax.annotation.PreDestroy; @Component public class SubService implements InitializingBean { @@ -38,6 +42,11 @@ public class SubService implements InitializingBean { final String dcn = "FT0"; final String subsys = "1234"; + // CountDownLatch size is the same as messageSize in AsyncPublishInstance.java (Publisher) + private CountDownLatch countDownLatch = new CountDownLatch(AsyncPublishInstance.messageSize); + + private ExecutorService executorService = Executors.newFixedThreadPool(5); + @Override public void afterPropertiesSet() throws Exception { @@ -59,31 +68,41 @@ public void afterPropertiesSet() throws Exception { liteConsumer.heartBeat(topicList, url); liteConsumer.subscribe(topicList, url); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - logger.info("start destory ...."); - try { - liteConsumer.unsubscribe(topicList, url); - } catch (EventMeshException e) { - e.printStackTrace(); - } + // Wait for all messaged to be consumed + executorService.submit(() ->{ try { - liteConsumer.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - } - logger.info("end destory."); - })); - - Thread stopThread = new Thread(() -> { - try { - Thread.sleep(5 * 60 * 1000); + countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("stopThread start...."); System.exit(0); }); + } + + @PreDestroy + public void cleanup() { + logger.info("start destory ...."); + try { + liteConsumer.unsubscribe(topicList, url); + } catch (EventMeshException e) { + e.printStackTrace(); + } + try { + liteConsumer.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + executorService.shutdown(); + logger.info("end destory."); + } - stopThread.start(); + /** + * Count the message already consumed + */ + public void consumeMessage(String msg) { + logger.info("consume message {}", msg); + countDownLatch.countDown(); + logger.info("remaining number of messages to be consumed {}", countDownLatch.getCount()); } }