Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #3192] replace nacos-config module http client #3523

Merged
merged 7 commits into from
Aug 6, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.alibaba.nacos.config.server.service;

import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.config.server.model.SampleResult;
import com.alibaba.nacos.config.server.service.notify.NotifyService;
Expand All @@ -30,7 +31,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.net.HttpURLConnection;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -178,19 +178,16 @@ public SampleResult call() throws Exception {
}

String urlAll = getUrl(ip, url) + "?" + paramUrl;
com.alibaba.nacos.config.server.service.notify.NotifyService.HttpResult result = NotifyService
RestResult<String> result = NotifyService
.invokeURL(urlAll, null, Constants.ENCODE);

// Http code 200
if (result.code == HttpURLConnection.HTTP_OK) {
String json = result.content;
SampleResult resultObj = JSONUtils.deserializeObject(json, new TypeReference<SampleResult>() {
if (result.ok()) {
return JSONUtils.deserializeObject(result.getData(), new TypeReference<SampleResult>() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can unified json utils to Jackson util?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will modify

});
return resultObj;

} else {

LogUtil.DEFAULT_LOG.info("Can not get clientInfo from {} with {}", ip, result.code);
LogUtil.DEFAULT_LOG.info("Can not get clientInfo from {} with {}", ip, result.getData());
return null;
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@

package com.alibaba.nacos.config.server.service.notify;

import com.alibaba.nacos.common.http.Callback;
import com.alibaba.nacos.common.http.client.NacosAsyncRestTemplate;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
Expand All @@ -25,20 +30,11 @@
import com.alibaba.nacos.config.server.service.trace.ConfigTraceService;
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.config.server.utils.PropertyUtil;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.InetUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.HttpClientUtils;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -63,7 +59,6 @@ public class AsyncNotifyService {
@Autowired
public AsyncNotifyService(ServerMemberManager memberManager) {
this.memberManager = memberManager;
httpclient.start();

// Register ConfigDataChangeEvent to NotifyCenter.
NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
Expand All @@ -89,7 +84,7 @@ public void onEvent(Event event) {
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
evt.isBeta));
}
ConfigExecutor.executeAsyncNotify(new AsyncTask(httpclient, queue));
ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
}
}

Expand All @@ -100,21 +95,20 @@ public Class<? extends Event> subscribeType() {
});
}

private RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(PropertyUtil.getNotifyConnectTimeout())
.setSocketTimeout(PropertyUtil.getNotifySocketTimeout()).build();

private CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setDefaultRequestConfig(requestConfig)
.build();
private final NacosAsyncRestTemplate nacosAsyncRestTemplate = HttpClientManager.getNacosAsyncRestTemplate();

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncNotifyService.class);

private ServerMemberManager memberManager;

class AsyncTask implements Runnable {

public AsyncTask(CloseableHttpAsyncClient httpclient, Queue<NotifySingleTask> queue) {
this.httpclient = httpclient;
private Queue<NotifySingleTask> queue;

private NacosAsyncRestTemplate restTemplate;

public AsyncTask(NacosAsyncRestTemplate restTemplate, Queue<NotifySingleTask> queue) {
this.restTemplate = restTemplate;
this.queue = queue;
}

Expand All @@ -138,52 +132,52 @@ private void executeAsyncInvoke() {
// get delay time and set fail count to the task
asyncTaskExecute(task);
} else {
HttpGet request = new HttpGet(task.url);
request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
String.valueOf(task.getLastModified()));
request.setHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIp());
Header header = Header.newInstance();
header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIp());
if (task.isBeta) {
request.setHeader("isBeta", "true");
header.addParam("isBeta", "true");
}
AsyncNotifyCallBack asyncNotifyCallBack = new AsyncNotifyCallBack(task);
try {
restTemplate.get(task.url, header, Query.EMPTY, String.class, asyncNotifyCallBack);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we modified this restTemplate.get method, if any error when execute get, call the callback.onError in the restTemplate.get rather than throw exception?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, very good advice

} catch (Exception e) {
asyncNotifyCallBack.onError(e);
}
httpclient.execute(request, new AsyncNotifyCallBack(httpclient, task));
}
}
}
}

private Queue<NotifySingleTask> queue;

private CloseableHttpAsyncClient httpclient;

}

private void asyncTaskExecute(NotifySingleTask task) {
int delay = getDelayTime(task);
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
queue.add(task);
AsyncTask asyncTask = new AsyncTask(httpclient, queue);
AsyncTask asyncTask = new AsyncTask(nacosAsyncRestTemplate, queue);
ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);
}

class AsyncNotifyCallBack implements FutureCallback<HttpResponse> {
class AsyncNotifyCallBack implements Callback<String> {

private NotifySingleTask task;

public AsyncNotifyCallBack(CloseableHttpAsyncClient httpClient, NotifySingleTask task) {
public AsyncNotifyCallBack(NotifySingleTask task) {
this.task = task;
this.httpClient = httpClient;
}

@Override
public void completed(HttpResponse response) {
public void onReceive(RestResult<String> result) {

long delayed = System.currentTimeMillis() - task.getLastModified();

if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
if (result.ok()) {
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
task.getLastModified(), InetUtils.getSelfIp(), ConfigTraceService.NOTIFY_EVENT_OK, delayed,
task.target);
} else {
LOGGER.error("[notify-error] target:{} dataId:{} group:{} ts:{} code:{}", task.target, task.getDataId(),
task.getGroup(), task.getLastModified(), response.getStatusLine().getStatusCode());
task.getGroup(), task.getLastModified(), result.getCode());
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
task.getLastModified(), InetUtils.getSelfIp(), ConfigTraceService.NOTIFY_EVENT_ERROR, delayed,
task.target);
Expand All @@ -197,11 +191,10 @@ public void completed(HttpResponse response) {

MetricsMonitor.getConfigNotifyException().increment();
}
HttpClientUtils.closeQuietly(response);
}

@Override
public void failed(Exception ex) {
public void onError(Throwable ex) {

long delayed = System.currentTimeMillis() - task.getLastModified();
LOGGER.error("[notify-exception] target:{} dataId:{} group:{} ts:{} ex:{}", task.target, task.getDataId(),
Expand All @@ -219,7 +212,7 @@ public void failed(Exception ex) {
}

@Override
public void cancelled() {
public void onCancel() {

LogUtil.NOTIFY_LOG.error("[notify-exception] target:{} dataId:{} group:{} ts:{} method:{}", task.target,
task.getDataId(), task.getGroup(), task.getLastModified(), "CANCELED");
Expand All @@ -231,10 +224,6 @@ public void cancelled() {

MetricsMonitor.getConfigNotifyException().increment();
}

private NotifySingleTask task;

private CloseableHttpAsyncClient httpClient;
}

static class NotifySingleTask extends NotifyTask {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.config.server.service.notify;

import com.alibaba.nacos.common.http.AbstractHttpClientFactory;
import com.alibaba.nacos.common.http.HttpClientBeanHolder;
import com.alibaba.nacos.common.http.HttpClientConfig;
import com.alibaba.nacos.common.http.client.NacosAsyncRestTemplate;
import com.alibaba.nacos.common.http.client.NacosRestTemplate;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.config.server.utils.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* http client manager.
*
* @author mai.jh
*/
public final class HttpClientManager {

private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientManager.class);

/**
* Connection timeout and socket timeout with other servers.
*/
private static final int TIMEOUT = 500;

private static final NacosRestTemplate NACOS_REST_TEMPLATE;

private static final NacosAsyncRestTemplate NACOS_ASYNC_REST_TEMPLATE;

static {
// build nacos rest template
NACOS_REST_TEMPLATE = HttpClientBeanHolder.getNacosRestTemplate(new ConfigHttpClientFactory(TIMEOUT, TIMEOUT));
NACOS_ASYNC_REST_TEMPLATE = HttpClientBeanHolder.getNacosAsyncRestTemplate(
new ConfigHttpClientFactory(PropertyUtil.getNotifyConnectTimeout(),
PropertyUtil.getNotifySocketTimeout()));

ThreadUtils.addShutdownHook(new Runnable() {
@Override
public void run() {
shutdown();
}
});
}

public static NacosRestTemplate getNacosRestTemplate() {
return NACOS_REST_TEMPLATE;
}

public static NacosAsyncRestTemplate getNacosAsyncRestTemplate() {
return NACOS_ASYNC_REST_TEMPLATE;
}

private static void shutdown() {
LOGGER.warn("[ConfigServer-HttpClientManager] Start destroying NacosRestTemplate");
try {
final String httpClientFactoryBeanName = ConfigHttpClientFactory.class.getName();
HttpClientBeanHolder.shutdownNacostSyncRest(httpClientFactoryBeanName);
HttpClientBeanHolder.shutdownNacosAsyncRest(httpClientFactoryBeanName);
} catch (Exception ex) {
LOGGER.error("[ConfigServer-HttpClientManager] An exception occurred when the HTTP client was closed : {}",
ExceptionUtil.getStackTrace(ex));
}
LOGGER.warn("[ConfigServer-HttpClientManager] Destruction of the end");
}

/**
* http client factory.
*/
private static class ConfigHttpClientFactory extends AbstractHttpClientFactory {

private final int conTimeOutMillis;

private final int readTimeOutMillis;

public ConfigHttpClientFactory(int conTimeOutMillis, int readTimeOutMillis) {
this.conTimeOutMillis = conTimeOutMillis;
this.readTimeOutMillis = readTimeOutMillis;
}

@Override
protected HttpClientConfig buildHttpClientConfig() {
return HttpClientConfig.builder().setConTimeOutMillis(conTimeOutMillis)
.setReadTimeOutMillis(readTimeOutMillis).build();
}

@Override
protected Logger assignLogger() {
return LOGGER;
}
}
}
Loading