Skip to content

Commit

Permalink
Change Tcp interface (#603)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun authored Nov 23, 2021
1 parent ac8db5a commit bcc1503
Show file tree
Hide file tree
Showing 18 changed files with 150 additions and 779 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package org.apache.eventmesh.protocol.meshmessage.resolver.http;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2RequestBody;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageRequestBody;
Expand All @@ -30,8 +26,14 @@
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageRequestHeader;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;

import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;

public class SendMessageRequestProtocolResolver {

public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHandleException {
Expand Down Expand Up @@ -62,49 +64,51 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan
cloudEventBuilder = CloudEventBuilder.v1();

event = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo())
.withSubject(sendMessageRequestBody.getTopic())
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
.withExtension(ProtocolKey.VERSION, version.getVersion())
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
.withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP, sendMessageRequestBody.getProducerGroup())
.withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageRequestBody.getTtl())
.withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageRequestBody.getTag())
.build();
.withSubject(sendMessageRequestBody.getTopic())
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
.withExtension(ProtocolKey.VERSION, version.getVersion())
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
.withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP,
sendMessageRequestBody.getProducerGroup())
.withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageRequestBody.getTtl())
.withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageRequestBody.getTag())
.build();
} else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) {
cloudEventBuilder = CloudEventBuilder.v03();
event = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo())
.withSubject(sendMessageRequestBody.getTopic())
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
.withExtension(ProtocolKey.VERSION, version.getVersion())
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
.withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP, sendMessageRequestBody.getProducerGroup())
.withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageRequestBody.getTtl())
.withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageRequestBody.getTag())
.build();
.withSubject(sendMessageRequestBody.getTopic())
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
.withExtension(ProtocolKey.VERSION, version.getVersion())
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
.withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP,
sendMessageRequestBody.getProducerGroup())
.withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageRequestBody.getTtl())
.withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageRequestBody.getTag())
.build();
}
return event;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ private CloseableHttpClient setHttpClient() throws EventMeshException {
}

protected String selectEventMesh() {
// todo: target endpoint maybe destroy, should remove the bad endpoint
if (eventMeshHttpClientConfig.isUseTls()) {
return Constants.HTTPS_PROTOCOL_PREFIX + eventMeshServerSelector.select();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,32 @@ public class EventMeshHttpClientConfig {
@Builder.Default
private int consumeThreadMax = 5;

private String env;
@Builder.Default
private String env = "";

@Builder.Default
private String consumerGroup = "DefaultConsumerGroup";

@Builder.Default
private String producerGroup = "DefaultProducerGroup";

private String idc;
@Builder.Default
private String idc = "";

@Builder.Default
private String ip = "127.0.0.1";

private String pid;
@Builder.Default
private String pid = "";

private String sys;
@Builder.Default
private String sys = "";

private String userName;
@Builder.Default
private String userName = "";

private String password;
@Builder.Default
private String password = "";

@Builder.Default
private boolean useTls = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.utils.JsonUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand All @@ -45,7 +47,6 @@
import java.util.stream.Collectors;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import io.netty.handler.codec.http.HttpMethod;
Expand All @@ -56,7 +57,7 @@ public class EventMeshHttpConsumer extends AbstractHttpClient implements AutoClo

private final ThreadPoolExecutor consumeExecutor;

private static final List<SubscriptionItem> subscription = Lists.newArrayList();
private static final List<SubscriptionItem> SUBSCRIPTIONS = Collections.synchronizedList(new ArrayList<>());

private final ScheduledThreadPoolExecutor scheduler;

Expand Down Expand Up @@ -100,7 +101,7 @@ public void subscribe(List<SubscriptionItem> topicList, String subscribeUrl) thr
if (ret.getRetCode() != EventMeshRetCode.SUCCESS.getRetCode()) {
throw new EventMeshException(ret.getRetCode(), ret.getRetMsg());
}
subscription.addAll(topicList);
SUBSCRIPTIONS.addAll(topicList);
} catch (Exception ex) {
throw new EventMeshException(String.format("Subscribe topic error, target:%s", target), ex);
}
Expand Down Expand Up @@ -156,7 +157,7 @@ public void unsubscribe(List<String> topicList, String unSubscribeUrl) throws Ev
throw new EventMeshException(ret.getRetCode(), ret.getRetMsg());
}
// todo: avoid concurrentModifiedException
subscription.removeIf(item -> topicList.contains(item.getTopic()));
SUBSCRIPTIONS.removeIf(item -> topicList.contains(item.getTopic()));
} catch (Exception ex) {
throw new EventMeshException(String.format("Unsubscribe topic error, target:%s", target), ex);
}
Expand All @@ -173,15 +174,6 @@ public void close() throws EventMeshException {
log.info("LiteConsumer shutdown");
}

private String selectEventMesh() {
// todo: target endpoint maybe destroy, should remove the bad endpoint
if (eventMeshHttpClientConfig.isUseTls()) {
return Constants.HTTPS_PROTOCOL_PREFIX + eventMeshServerSelector.select();
} else {
return Constants.HTTP_PROTOCOL_PREFIX + eventMeshServerSelector.select();
}
}

private RequestParam buildCommonRequestParam() {
return new RequestParam(HttpMethod.POST)
.addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshHttpClientConfig.getEnv())
Expand All @@ -191,6 +183,7 @@ private RequestParam buildCommonRequestParam() {
.addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshHttpClientConfig.getSys())
.addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshHttpClientConfig.getUserName())
.addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshHttpClientConfig.getPassword())
// add protocol version?
.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
.setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,25 @@
import com.google.common.base.Preconditions;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.netty.handler.codec.http.HttpMethod;
import lombok.extern.slf4j.Slf4j;

@Slf4j
class CloudEventProducer extends AbstractHttpClient implements EventMeshProtocolProducer<CloudEvent> {

private static final String PROTOCOL_TYPE = "cloudevents";

public CloudEventProducer(EventMeshHttpClientConfig eventMeshHttpClientConfig) throws EventMeshException {
super(eventMeshHttpClientConfig);
}

@Override
public void publish(CloudEvent cloudEvent) throws EventMeshException {
validateCloudEvent(cloudEvent);
CloudEvent enhanceCloudEvent = enhanceCloudEvent(cloudEvent);
// todo: Can put to abstract class, all protocol use the same send method? This can be a template method
RequestParam requestParam = buildCommonPostParam(cloudEvent)
RequestParam requestParam = buildCommonPostParam(enhanceCloudEvent)
.addHeader(ProtocolKey.REQUEST_CODE, RequestCode.MSG_SEND_ASYNC.getRequestCode());
String target = selectEventMesh();
try {
Expand All @@ -53,7 +57,8 @@ public void publish(CloudEvent cloudEvent) throws EventMeshException {
@Override
public CloudEvent request(CloudEvent cloudEvent, long timeout) throws EventMeshException {
validateCloudEvent(cloudEvent);
RequestParam requestParam = buildCommonPostParam(cloudEvent)
CloudEvent enhanceCloudEvent = enhanceCloudEvent(cloudEvent);
RequestParam requestParam = buildCommonPostParam(enhanceCloudEvent)
.addHeader(ProtocolKey.REQUEST_CODE, RequestCode.MSG_SEND_SYNC.getRequestCode())
.setTimeout(timeout);
String target = selectEventMesh();
Expand All @@ -71,15 +76,16 @@ public CloudEvent request(CloudEvent cloudEvent, long timeout) throws EventMeshE
}

@Override
public void request(CloudEvent cloudEvent, RRCallback<CloudEvent> rrCallback, long timeout)
public void request(final CloudEvent cloudEvent, final RRCallback<CloudEvent> rrCallback, long timeout)
throws EventMeshException {
validateCloudEvent(cloudEvent);
RequestParam requestParam = buildCommonPostParam(cloudEvent)
CloudEvent enhanceCloudEvent = enhanceCloudEvent(cloudEvent);
RequestParam requestParam = buildCommonPostParam(enhanceCloudEvent)
.addHeader(ProtocolKey.REQUEST_CODE, RequestCode.MSG_SEND_SYNC.getRequestCode())
.setTimeout(timeout);
String target = selectEventMesh();
RRCallbackResponseHandlerAdapter<CloudEvent> adapter =
new RRCallbackResponseHandlerAdapter<>(cloudEvent, rrCallback, timeout);
RRCallbackResponseHandlerAdapter<CloudEvent> adapter = new RRCallbackResponseHandlerAdapter<>(
enhanceCloudEvent, rrCallback, timeout);
try {
HttpUtils.post(httpClient, null, target, requestParam, adapter);
} catch (IOException e) {
Expand All @@ -97,14 +103,27 @@ private RequestParam buildCommonPostParam(CloudEvent cloudEvent) {
requestParam
.addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshHttpClientConfig.getUserName())
.addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshHttpClientConfig.getPassword())
.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
// todo: add producerGroup to header, set protocol type, protocol version
.addHeader(ProtocolKey.PROTOCOL_TYPE, PROTOCOL_TYPE)
// todo: move producerGroup tp header
.addBody(SendMessageRequestBody.PRODUCERGROUP, eventMeshHttpClientConfig.getProducerGroup())
.addBody(SendMessageRequestBody.CONTENT, JsonUtils.serialize(cloudEvent));
return requestParam;
}

private CloudEvent enhanceCloudEvent(final CloudEvent cloudEvent) {
return CloudEventBuilder.from(cloudEvent)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, eventMeshHttpClientConfig.getEnv())
.withExtension(ProtocolKey.ClientInstanceKey.IDC, eventMeshHttpClientConfig.getIdc())
.withExtension(ProtocolKey.ClientInstanceKey.IP, eventMeshHttpClientConfig.getIp())
.withExtension(ProtocolKey.ClientInstanceKey.PID, eventMeshHttpClientConfig.getPid())
.withExtension(ProtocolKey.ClientInstanceKey.SYS, eventMeshHttpClientConfig.getSys())
.withExtension(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
.withExtension(ProtocolKey.PROTOCOL_DESC, cloudEvent.getSpecVersion().name())
.withExtension(ProtocolKey.PROTOCOL_VERSION, cloudEvent.getSpecVersion().toString())
.build();
}

private CloudEvent transformMessage(EventMeshRetObj retObj) {
SendMessageResponseBody.ReplyMessage replyMessage = JsonUtils.deserialize(retObj.getRetMsg(),
SendMessageResponseBody.ReplyMessage.class);
Expand Down
Loading

0 comments on commit bcc1503

Please sign in to comment.