diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractLiteClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractLiteClient.java index 1710149f83..20302af8e6 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractLiteClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractLiteClient.java @@ -18,10 +18,17 @@ package org.apache.eventmesh.client.http; import org.apache.eventmesh.client.http.conf.LiteClientConfig; +import org.apache.eventmesh.client.http.ssl.MyX509TrustManager; import org.apache.eventmesh.client.http.util.HttpLoadBalanceUtils; import org.apache.eventmesh.common.loadbalance.LoadBalanceSelector; +import org.apache.http.conn.ssl.DefaultHostnameVerifier; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import java.security.SecureRandom; public abstract class AbstractLiteClient { @@ -46,4 +53,22 @@ public LiteClientConfig getLiteClientConfig() { public void shutdown() throws Exception { logger.info("AbstractLiteClient shutdown"); } + + public CloseableHttpClient setHttpClient() throws Exception { + if (!liteClientConfig.isUseTls()) { + return HttpClients.createDefault(); + } + SSLContext sslContext = null; + try { + String protocol = System.getProperty("ssl.client.protocol", "TLSv1.2"); + TrustManager[] tm = new TrustManager[]{new MyX509TrustManager()}; + sslContext = SSLContext.getInstance(protocol); + sslContext.init(null, tm, new SecureRandom()); + return HttpClients.custom().setSSLContext(sslContext) + .setSSLHostnameVerifier(new DefaultHostnameVerifier()).build(); + } catch (Exception e) { + logger.error("Error in creating HttpClient.", e); + throw e; + } + } } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java index 7c0ef10777..ede5ef0743 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java @@ -65,15 +65,13 @@ public class LiteConsumer extends AbstractLiteClient { private ThreadPoolExecutor consumeExecutor; - private static CloseableHttpClient httpClient = HttpClients.createDefault(); - protected LiteClientConfig eventMeshClientConfig; private List subscription = Lists.newArrayList(); private LiteMessageListener messageListener; - protected static final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(4, new EventMeshThreadFactoryImpl("TCPClientScheduler", true)); + protected final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(4, new EventMeshThreadFactoryImpl("TCPClientScheduler", true)); public LiteConsumer(LiteClientConfig liteClientConfig) throws Exception { super(liteClientConfig); @@ -110,7 +108,10 @@ public void start() throws Exception { public void shutdown() throws Exception { logger.info("LiteConsumer shutting down"); super.shutdown(); - httpClient.close(); + if (consumeExecutor != null) { + consumeExecutor.shutdown(); + } + scheduler.shutdown(); started.compareAndSet(true, false); logger.info("LiteConsumer shutdown"); } @@ -126,10 +127,9 @@ public boolean subscribe(List topicList, String url) throws Exception { long startTime = System.currentTimeMillis(); String target = selectEventMesh(); String subRes = ""; - try { + + try (CloseableHttpClient httpClient = setHttpClient()){ subRes = HttpUtil.post(httpClient, target, subscribeParam); - } catch (Exception ex) { - throw new EventMeshException(ex); } if (logger.isDebugEnabled()) { @@ -211,10 +211,9 @@ public void run() { long startTime = System.currentTimeMillis(); String target = selectEventMesh(); String res = ""; - try { + + try (CloseableHttpClient httpClient = setHttpClient()) { res = HttpUtil.post(httpClient, target, requestParam); - } catch (Exception ex) { - throw new EventMeshException(ex); } if (logger.isDebugEnabled()) { @@ -234,17 +233,16 @@ public void run() { }, EventMeshCommon.HEATBEAT, EventMeshCommon.HEATBEAT, TimeUnit.MILLISECONDS); } - public boolean unsubscribe(List topicList, String url) throws EventMeshException { + public boolean unsubscribe(List topicList, String url) throws Exception { subscription.removeAll(topicList); RequestParam unSubscribeParam = generateUnSubscribeRequestParam(topicList, url); long startTime = System.currentTimeMillis(); String target = selectEventMesh(); String unSubRes = ""; - try { + + try (CloseableHttpClient httpClient = setHttpClient()) { unSubRes = HttpUtil.post(httpClient, target, unSubscribeParam); - } catch (Exception ex) { - throw new EventMeshException(ex); } if (logger.isDebugEnabled()) { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java index 5e81579b74..d16b8f91ac 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java @@ -61,13 +61,8 @@ public class LiteProducer extends AbstractLiteClient { public Logger logger = LoggerFactory.getLogger(LiteProducer.class); - private static CloseableHttpClient httpClient = HttpClients.createDefault(); - public LiteProducer(LiteClientConfig liteClientConfig) { super(liteClientConfig); - if (liteClientConfig.isUseTls()) { - setHttpClient(); - } } private AtomicBoolean started = new AtomicBoolean(Boolean.FALSE); @@ -92,7 +87,6 @@ public void shutdown() throws Exception { } logger.info("LiteProducer shutting down"); super.shutdown(); - httpClient.close(); started.compareAndSet(true, false); logger.info("LiteProducer shutdown"); } @@ -132,10 +126,9 @@ public boolean publish(LiteMessage message) throws Exception { long startTime = System.currentTimeMillis(); String target = selectEventMesh(); String res = ""; - try { + + try (CloseableHttpClient httpClient = setHttpClient()) { res = HttpUtil.post(httpClient, target, requestParam); - } catch (Exception ex) { - throw new EventMeshException(ex); } if (logger.isDebugEnabled()) { @@ -191,10 +184,9 @@ public LiteMessage request(LiteMessage message, long timeout) throws Exception { long startTime = System.currentTimeMillis(); String target = selectEventMesh(); String res = ""; - try { + + try (CloseableHttpClient httpClient = setHttpClient()) { res = HttpUtil.post(httpClient, target, requestParam); - } catch (Exception ex) { - throw new EventMeshException(ex); } if (logger.isDebugEnabled()) { @@ -246,32 +238,13 @@ public void request(LiteMessage message, RRCallback rrCallback, long timeout) th long startTime = System.currentTimeMillis(); String target = selectEventMesh(); - try { + + try (CloseableHttpClient httpClient = setHttpClient()) { HttpUtil.post(httpClient, null, target, requestParam, new RRCallbackResponseHandlerAdapter(message, rrCallback, timeout)); - } catch (Exception ex) { - throw new EventMeshException(ex); } if (logger.isDebugEnabled()) { logger.debug("publish sync message by async, target:{}, cost:{}, message:{}", target, System.currentTimeMillis() - startTime, message); } } - - public static void setHttpClient() { - SSLContext sslContext = null; - try { - String protocol = System.getProperty("ssl.client.protocol", "TLSv1.1"); - TrustManager[] tm = new TrustManager[]{new MyX509TrustManager()}; - sslContext = SSLContext.getInstance(protocol); - sslContext.init(null, tm, new SecureRandom()); - httpClient = HttpClients.custom().setSSLContext(sslContext) - .setSSLHostnameVerifier(new DefaultHostnameVerifier()).build(); - } catch (NoSuchAlgorithmException e) { - e.printStackTrace(); - } catch (KeyManagementException e) { - e.printStackTrace(); - } catch (Exception e) { - e.printStackTrace(); - } - } } diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index 9a51e4d2fa..e6b92afc99 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -82,7 +82,7 @@ public void cleanup() { logger.info("start destory ...."); try { liteConsumer.unsubscribe(topicList, url); - } catch (EventMeshException e) { + } catch (Exception e) { e.printStackTrace(); } try {