Skip to content

Commit

Permalink
[Issue #368] Fix Racing condition and memory leak issue in EventMesh …
Browse files Browse the repository at this point in the history
…SDK LiteConsumer and LiteProducer (#369)

* [Issue #337] Fix HttpSubscriber startup issue

* [Issue #337] test commit

* [Issue #337] revert test commit

* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook

* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook

* [Issue #337] Address code review comment for Subscriber Demo App

* [Issue #368] Fix Racing condition and memory leak issue in EventMesh SDK LiteConsumer and LiteProducer

* [Issue #368] fix build issue

* [Issue #368] use try with resource statement for HttpClient

* [Issue #368] fix TLS1.1 and use TLS1.2 in HttpClient

Co-authored-by: j00441484 <[email protected]>
  • Loading branch information
jinrongluo and j00441484 authored May 31, 2021
1 parent 0faf1a1 commit 60d2d35
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@
package org.apache.eventmesh.client.http;

import org.apache.eventmesh.client.http.conf.LiteClientConfig;
import org.apache.eventmesh.client.http.ssl.MyX509TrustManager;
import org.apache.eventmesh.client.http.util.HttpLoadBalanceUtils;
import org.apache.eventmesh.common.loadbalance.LoadBalanceSelector;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import java.security.SecureRandom;

public abstract class AbstractLiteClient {

Expand All @@ -46,4 +53,22 @@ public LiteClientConfig getLiteClientConfig() {
public void shutdown() throws Exception {
logger.info("AbstractLiteClient shutdown");
}

public CloseableHttpClient setHttpClient() throws Exception {
if (!liteClientConfig.isUseTls()) {
return HttpClients.createDefault();
}
SSLContext sslContext = null;
try {
String protocol = System.getProperty("ssl.client.protocol", "TLSv1.2");
TrustManager[] tm = new TrustManager[]{new MyX509TrustManager()};
sslContext = SSLContext.getInstance(protocol);
sslContext.init(null, tm, new SecureRandom());
return HttpClients.custom().setSSLContext(sslContext)
.setSSLHostnameVerifier(new DefaultHostnameVerifier()).build();
} catch (Exception e) {
logger.error("Error in creating HttpClient.", e);
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,13 @@ public class LiteConsumer extends AbstractLiteClient {

private ThreadPoolExecutor consumeExecutor;

private static CloseableHttpClient httpClient = HttpClients.createDefault();

protected LiteClientConfig eventMeshClientConfig;

private List<String> subscription = Lists.newArrayList();

private LiteMessageListener messageListener;

protected static final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(4, new EventMeshThreadFactoryImpl("TCPClientScheduler", true));
protected final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(4, new EventMeshThreadFactoryImpl("TCPClientScheduler", true));

public LiteConsumer(LiteClientConfig liteClientConfig) throws Exception {
super(liteClientConfig);
Expand Down Expand Up @@ -110,7 +108,10 @@ public void start() throws Exception {
public void shutdown() throws Exception {
logger.info("LiteConsumer shutting down");
super.shutdown();
httpClient.close();
if (consumeExecutor != null) {
consumeExecutor.shutdown();
}
scheduler.shutdown();
started.compareAndSet(true, false);
logger.info("LiteConsumer shutdown");
}
Expand All @@ -126,10 +127,9 @@ public boolean subscribe(List<String> topicList, String url) throws Exception {
long startTime = System.currentTimeMillis();
String target = selectEventMesh();
String subRes = "";
try {

try (CloseableHttpClient httpClient = setHttpClient()){
subRes = HttpUtil.post(httpClient, target, subscribeParam);
} catch (Exception ex) {
throw new EventMeshException(ex);
}

if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -211,10 +211,9 @@ public void run() {
long startTime = System.currentTimeMillis();
String target = selectEventMesh();
String res = "";
try {

try (CloseableHttpClient httpClient = setHttpClient()) {
res = HttpUtil.post(httpClient, target, requestParam);
} catch (Exception ex) {
throw new EventMeshException(ex);
}

if (logger.isDebugEnabled()) {
Expand All @@ -234,17 +233,16 @@ public void run() {
}, EventMeshCommon.HEATBEAT, EventMeshCommon.HEATBEAT, TimeUnit.MILLISECONDS);
}

public boolean unsubscribe(List<String> topicList, String url) throws EventMeshException {
public boolean unsubscribe(List<String> topicList, String url) throws Exception {
subscription.removeAll(topicList);
RequestParam unSubscribeParam = generateUnSubscribeRequestParam(topicList, url);

long startTime = System.currentTimeMillis();
String target = selectEventMesh();
String unSubRes = "";
try {

try (CloseableHttpClient httpClient = setHttpClient()) {
unSubRes = HttpUtil.post(httpClient, target, unSubscribeParam);
} catch (Exception ex) {
throw new EventMeshException(ex);
}

if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,8 @@ public class LiteProducer extends AbstractLiteClient {

public Logger logger = LoggerFactory.getLogger(LiteProducer.class);

private static CloseableHttpClient httpClient = HttpClients.createDefault();

public LiteProducer(LiteClientConfig liteClientConfig) {
super(liteClientConfig);
if (liteClientConfig.isUseTls()) {
setHttpClient();
}
}

private AtomicBoolean started = new AtomicBoolean(Boolean.FALSE);
Expand All @@ -92,7 +87,6 @@ public void shutdown() throws Exception {
}
logger.info("LiteProducer shutting down");
super.shutdown();
httpClient.close();
started.compareAndSet(true, false);
logger.info("LiteProducer shutdown");
}
Expand Down Expand Up @@ -132,10 +126,9 @@ public boolean publish(LiteMessage message) throws Exception {
long startTime = System.currentTimeMillis();
String target = selectEventMesh();
String res = "";
try {

try (CloseableHttpClient httpClient = setHttpClient()) {
res = HttpUtil.post(httpClient, target, requestParam);
} catch (Exception ex) {
throw new EventMeshException(ex);
}

if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -191,10 +184,9 @@ public LiteMessage request(LiteMessage message, long timeout) throws Exception {
long startTime = System.currentTimeMillis();
String target = selectEventMesh();
String res = "";
try {

try (CloseableHttpClient httpClient = setHttpClient()) {
res = HttpUtil.post(httpClient, target, requestParam);
} catch (Exception ex) {
throw new EventMeshException(ex);
}

if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -246,32 +238,13 @@ public void request(LiteMessage message, RRCallback rrCallback, long timeout) th

long startTime = System.currentTimeMillis();
String target = selectEventMesh();
try {

try (CloseableHttpClient httpClient = setHttpClient()) {
HttpUtil.post(httpClient, null, target, requestParam, new RRCallbackResponseHandlerAdapter(message, rrCallback, timeout));
} catch (Exception ex) {
throw new EventMeshException(ex);
}

if (logger.isDebugEnabled()) {
logger.debug("publish sync message by async, target:{}, cost:{}, message:{}", target, System.currentTimeMillis() - startTime, message);
}
}

public static void setHttpClient() {
SSLContext sslContext = null;
try {
String protocol = System.getProperty("ssl.client.protocol", "TLSv1.1");
TrustManager[] tm = new TrustManager[]{new MyX509TrustManager()};
sslContext = SSLContext.getInstance(protocol);
sslContext.init(null, tm, new SecureRandom());
httpClient = HttpClients.custom().setSSLContext(sslContext)
.setSSLHostnameVerifier(new DefaultHostnameVerifier()).build();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (KeyManagementException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void cleanup() {
logger.info("start destory ....");
try {
liteConsumer.unsubscribe(topicList, url);
} catch (EventMeshException e) {
} catch (Exception e) {
e.printStackTrace();
}
try {
Expand Down

0 comments on commit 60d2d35

Please sign in to comment.