Skip to content

Commit

Permalink
[Issue apache#658] fix build checkstyles
Browse files Browse the repository at this point in the history
  • Loading branch information
jinrongluo committed Feb 16, 2022
1 parent cf24826 commit 1d143e9
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.runtime.util.WebhookUtil;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -51,7 +52,6 @@
import java.util.List;
import java.util.Map;

import org.apache.eventmesh.runtime.util.WebhookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.WebhookUtil;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.eventmesh.runtime.util.WebhookUtil;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
Expand All @@ -47,7 +47,6 @@
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;

Expand Down Expand Up @@ -78,10 +77,8 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
public Logger cmdLogger = LoggerFactory.getLogger("cmd");

public Logger logger = LoggerFactory.getLogger(this.getClass());

private Map<String, Set<AbstractHTTPPushRequest>> waitingRequests;

public String currPushUrl;
private Map<String, Set<AbstractHTTPPushRequest>> waitingRequests;

public AsyncHTTPPushRequest(HandleMsgContext handleMsgContext,
Map<String, Set<AbstractHTTPPushRequest>> waitingRequests) {
Expand Down Expand Up @@ -112,18 +109,18 @@ public void tryHTTPRequest() {
builder.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA);
builder.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion());
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
handleMsgContext.getEventMeshHTTPServer()
.getEventMeshHttpConfiguration().eventMeshCluster);
handleMsgContext.getEventMeshHTTPServer()
.getEventMeshHttpConfiguration().eventMeshCluster);
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, IPUtils.getLocalAddress());
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshEnv);
handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshEnv);
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshIDC);
handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshIDC);

CloudEvent event = CloudEventBuilder.from(handleMsgContext.getEvent())
.withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
.build();
.withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
.build();
handleMsgContext.setEvent(event);

String content = "";
Expand All @@ -133,7 +130,7 @@ public void tryHTTPRequest() {
ProtocolAdaptor<ProtocolTransportObject> protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);

ProtocolTransportObject protocolTransportObject =
protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent());
protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent());
content = ((HttpCommand) protocolTransportObject).getBody().toMap().get("content").toString();
} catch (Exception ex) {
return;
Expand All @@ -143,25 +140,25 @@ public void tryHTTPRequest() {
body.add(new BasicNameValuePair(PushMessageRequestBody.CONTENT, content));
if (StringUtils.isBlank(handleMsgContext.getBizSeqNo())) {
body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
RandomStringUtils.generateNum(20)));
RandomStringUtils.generateNum(20)));
} else {
body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
handleMsgContext.getBizSeqNo()));
handleMsgContext.getBizSeqNo()));
}
if (StringUtils.isBlank(handleMsgContext.getUniqueId())) {
body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
RandomStringUtils.generateNum(20)));
RandomStringUtils.generateNum(20)));
} else {
body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
handleMsgContext.getUniqueId()));
handleMsgContext.getUniqueId()));
}

body.add(new BasicNameValuePair(PushMessageRequestBody.RANDOMNO,
handleMsgContext.getMsgRandomNo()));
handleMsgContext.getMsgRandomNo()));
body.add(new BasicNameValuePair(PushMessageRequestBody.TOPIC, handleMsgContext.getTopic()));

body.add(new BasicNameValuePair(PushMessageRequestBody.EXTFIELDS,
JsonUtils.serialize(EventMeshUtil.getEventProp(handleMsgContext.getEvent()))));
JsonUtils.serialize(EventMeshUtil.getEventProp(handleMsgContext.getEvent()))));

HttpEntity httpEntity = new UrlEncodedFormEntity(body, StandardCharsets.UTF_8);

Expand All @@ -181,7 +178,7 @@ public void tryHTTPRequest() {
addToWaitingMap(this);

cmdLogger.info("cmd={}|eventMesh2client|from={}|to={}", requestCode,
IPUtils.getLocalAddress(), currPushUrl);
IPUtils.getLocalAddress(), currPushUrl);

try {
eventMeshHTTPServer.httpClientPool.getClient().execute(builder, new ResponseHandler<Object>() {
Expand Down Expand Up @@ -245,13 +242,13 @@ public Object handleResponse(HttpResponse response) {

if (messageLogger.isDebugEnabled()) {
messageLogger.debug("message|eventMesh2client|url={}|topic={}|event={}", currPushUrl,
handleMsgContext.getTopic(),
handleMsgContext.getEvent());
handleMsgContext.getTopic(),
handleMsgContext.getEvent());
} else {
messageLogger
.info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}",
currPushUrl, handleMsgContext.getTopic(),
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId());
.info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}",
currPushUrl, handleMsgContext.getTopic(),
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId());
}
} catch (IOException e) {
messageLogger.error("push2client err", e);
Expand All @@ -267,16 +264,16 @@ public Object handleResponse(HttpResponse response) {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("asyncPushRequest={")
.append("bizSeqNo=").append(handleMsgContext.getBizSeqNo())
.append(",startIdx=").append(startIdx)
.append(",retryTimes=").append(retryTimes)
.append(",uniqueId=").append(handleMsgContext.getUniqueId())
.append(",executeTime=")
.append(DateFormatUtils.format(executeTime, Constants.DATE_FORMAT))
.append(",lastPushTime=")
.append(DateFormatUtils.format(lastPushTime, Constants.DATE_FORMAT))
.append(",createTime=")
.append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT)).append("}");
.append("bizSeqNo=").append(handleMsgContext.getBizSeqNo())
.append(",startIdx=").append(startIdx)
.append(",retryTimes=").append(retryTimes)
.append(",uniqueId=").append(handleMsgContext.getUniqueId())
.append(",executeTime=")
.append(DateFormatUtils.format(executeTime, Constants.DATE_FORMAT))
.append(",lastPushTime=")
.append(DateFormatUtils.format(lastPushTime, Constants.DATE_FORMAT))
.append(",createTime=")
.append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT)).append("}");
return sb.toString();
}

Expand All @@ -290,10 +287,10 @@ boolean processResponseStatus(int httpStatus, HttpResponse httpResponse) {

// Response Status code is 429 Too Many Requests
// retry after the time specified by the header
Optional<Header> optHeader = Arrays.stream(httpResponse.getHeaders("Retry-After")).findAny();
if (optHeader.isPresent() && StringUtils.isNumeric(optHeader.get().getValue())) {
delayRetry(Long.parseLong(optHeader.get().getValue()));
}
Optional<Header> optHeader = Arrays.stream(httpResponse.getHeaders("Retry-After")).findAny();
if (optHeader.isPresent() && StringUtils.isNumeric(optHeader.get().getValue())) {
delayRetry(Long.parseLong(optHeader.get().getValue()));
}
return false;
} else if (httpStatus == HttpStatus.SC_GONE || httpStatus == HttpStatus.SC_UNSUPPORTED_MEDIA_TYPE) {
// failed with no retry
Expand All @@ -312,8 +309,8 @@ ClientRetCode processResponseContent(String content) {

try {
Map<String, Object> ret =
JsonUtils.deserialize(content, new TypeReference<Map<String, Object>>() {
});
JsonUtils.deserialize(content, new TypeReference<Map<String, Object>>() {
});
Integer retCode = (Integer) ret.get("retCode");
if (retCode != null && ClientRetCode.contains(retCode)) {
return ClientRetCode.get(retCode);
Expand All @@ -322,15 +319,15 @@ ClientRetCode processResponseContent(String content) {
return ClientRetCode.FAIL;
} catch (NumberFormatException e) {
messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
return ClientRetCode.FAIL;
} catch (JsonException e) {
messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
return ClientRetCode.FAIL;
} catch (Throwable t) {
messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
return ClientRetCode.FAIL;
}
}
Expand All @@ -341,7 +338,7 @@ private void addToWaitingMap(AsyncHTTPPushRequest request) {
return;
}
waitingRequests
.put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet());
.put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet());
waitingRequests.get(request.handleMsgContext.getConsumerGroup()).add(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@

package org.apache.eventmesh.runtime.util;

import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.auth.AuthService;
import org.apache.eventmesh.spi.EventMeshExtensionFactory;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpOptions;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.message.BasicHeader;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Utility class for implementing CloudEvents Http Webhook spec
*
Expand Down

0 comments on commit 1d143e9

Please sign in to comment.