Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eventmesh message protocol for http pub/sub with standalone connector #619

Merged
merged 12 commits into from
Nov 30, 2021
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class PushMessageRequestBody extends Body {

public static final String RANDOMNO = "randomNo";
public static final String TOPIC = "topic";
public static final String BIZSEQNO = "bizSeqNo";
public static final String BIZSEQNO = "bizseqno";
public static final String UNIQUEID = "uniqueId";
public static final String CONTENT = "content";
public static final String EXTFIELDS = "extFields";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@

public class ReplyMessageRequestBody extends Body {

public static final String ORIGTOPIC = "origTopic";
public static final String BIZSEQNO = "bizSeqNo";
public static final String ORIGTOPIC = "origtopic";
public static final String BIZSEQNO = "bizseqno";
public static final String UNIQUEID = "uniqueId";
public static final String CONTENT = "content";
public static final String EXTFIELDS = "extFields";
public static final String PRODUCERGROUP = "producerGroup";
public static final String PRODUCERGROUP = "producergroup";

private String bizSeqNo;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@

public class SendMessageBatchV2RequestBody extends Body {

public static final String BIZSEQNO = "bizSeqNo";
public static final String BIZSEQNO = "bizseqno";
public static final String TOPIC = "topic";
public static final String MSG = "msg";
public static final String TAG = "tag";
public static final String TTL = "ttl";
public static final String PRODUCERGROUP = "producerGroup";
public static final String PRODUCERGROUP = "producergroup";

private String bizSeqNo;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
public class SendMessageRequestBody extends Body {

public static final String TOPIC = "topic";
public static final String BIZSEQNO = "bizSeqNo";
public static final String UNIQUEID = "uniqueId";
public static final String BIZSEQNO = "bizseqno";
public static final String UNIQUEID = "uniqueid";
public static final String CONTENT = "content";
public static final String TTL = "ttl";
public static final String TAG = "tag";
public static final String EXTFIELDS = "extFields";
public static final String PRODUCERGROUP = "producerGroup";
public static final String PRODUCERGROUP = "producergroup";

private String topic;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,34 @@

public class ProtocolKey {

public static final String REQUEST_CODE = "Code";
public static final String LANGUAGE = "Language";
public static final String VERSION = "Version";
public static final String REQUEST_CODE = "code";
public static final String LANGUAGE = "language";
public static final String VERSION = "version";

public static final String PROTOCOL_TYPE = "protocol_type";
public static final String PROTOCOL_TYPE = "protocoltype";

public static final String PROTOCOL_VERSION = "protocol_version";
public static final String PROTOCOL_VERSION = "protocolversion";

public static final String PROTOCOL_DESC = "protocol_desc";
public static final String PROTOCOL_DESC = "protocoldesc";

public static class ClientInstanceKey {
////////////////////////////////////Protocol layer requester description///////////
public static final String ENV = "Env";
public static final String IDC = "Idc";
public static final String SYS = "Sys";
public static final String PID = "Pid";
public static final String IP = "Ip";
public static final String USERNAME = "Username";
public static final String PASSWD = "Passwd";
public static final String ENV = "env";
public static final String IDC = "idc";
public static final String SYS = "sys";
public static final String PID = "pid";
public static final String IP = "ip";
public static final String USERNAME = "username";
public static final String PASSWD = "passwd";
}


public static class EventMeshInstanceKey {
///////////////////////////////////////////////Protocol layer EventMesh description
public static final String EVENTMESHCLUSTER = "EventMeshCluster";
public static final String EVENTMESHIP = "EventMeshIp";
public static final String EVENTMESHENV = "EventMeshEnv";
public static final String EVENTMESHIDC = "EventMeshIdc";
public static final String EVENTMESHCLUSTER = "eventmeshcluster";
public static final String EVENTMESHIP = "eventmeship";
public static final String EVENTMESHENV = "eventmeshenv";
public static final String EVENTMESHIDC = "eventmeshidc";
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public static Package rrResponse(Package request) {
public static EventMeshMessage generateSyncRRMqMsg() {
EventMeshMessage mqMsg = new EventMeshMessage();
mqMsg.setTopic(TOPIC_PRX_SyncSubscribeTest);
mqMsg.getProperties().put("msgType", "persistent");
mqMsg.getProperties().put("msgtype", "persistent");
mqMsg.getProperties().put("ttl", "300000");
mqMsg.getProperties().put("keys", generateRandomString(16));
mqMsg.setBody("testSyncRR");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2RequestBody;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageRequestBody;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
import org.apache.eventmesh.common.protocol.http.header.Header;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchV2RequestHeader;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;

import java.net.URI;
import java.nio.charset.StandardCharsets;

public class SendMessageBatchV2ProtocolResolver {
Expand Down Expand Up @@ -61,6 +63,8 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan

event = cloudEventBuilder.withId(sendMessageBatchV2RequestBody.getBizSeqNo())
.withSubject(sendMessageBatchV2RequestBody.getTopic())
.withType("eventmeshmessage")
.withSource(URI.create("/"))
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
Expand All @@ -84,6 +88,8 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan
cloudEventBuilder = CloudEventBuilder.v03();
event = cloudEventBuilder.withId(sendMessageBatchV2RequestBody.getBizSeqNo())
.withSubject(sendMessageBatchV2RequestBody.getTopic())
.withType("eventmeshmessage")
.withSource(URI.create("/"))
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.apache.commons.lang3.StringUtils;

import java.net.URI;
import java.nio.charset.StandardCharsets;

import io.cloudevents.CloudEvent;
Expand Down Expand Up @@ -65,6 +66,8 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan

event = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo())
.withSubject(sendMessageRequestBody.getTopic())
.withType("eventmeshmessage")
.withSource(URI.create("/"))
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
Expand All @@ -79,16 +82,19 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
.withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP,
.withExtension(SendMessageRequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
.withExtension(SendMessageRequestBody.UNIQUEID, sendMessageRequestBody.getUniqueId())
.withExtension(SendMessageRequestBody.PRODUCERGROUP,
sendMessageRequestBody.getProducerGroup())
.withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageRequestBody.getTtl())
.withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageRequestBody.getTag())
.withExtension(SendMessageRequestBody.TTL, sendMessageRequestBody.getTtl())
.withExtension(SendMessageRequestBody.TAG, sendMessageRequestBody.getTag())
.build();
} else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) {
cloudEventBuilder = CloudEventBuilder.v03();
event = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo())
.withSubject(sendMessageRequestBody.getTopic())
.withType("eventmeshmessage")
.withSource(URI.create("/"))
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
Expand All @@ -103,11 +109,12 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
.withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP,
.withExtension(SendMessageRequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
.withExtension(SendMessageRequestBody.UNIQUEID, sendMessageRequestBody.getUniqueId())
.withExtension(SendMessageRequestBody.PRODUCERGROUP,
sendMessageRequestBody.getProducerGroup())
.withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageRequestBody.getTtl())
.withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageRequestBody.getTag())
.withExtension(SendMessageRequestBody.TTL, sendMessageRequestBody.getTtl())
.withExtension(SendMessageRequestBody.TAG, sendMessageRequestBody.getTag())
.build();
}
return event;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
for (CloudEvent event : eventList) {
//validate event
if (StringUtils.isBlank(event.getId())
|| event.getSource() != null
|| event.getSpecVersion() != null
|| event.getSource() == null
|| event.getSpecVersion() == null
|| StringUtils.isBlank(event.getType())
|| StringUtils.isBlank(event.getSubject())) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
Expand Down Expand Up @@ -185,7 +185,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>

for (CloudEvent cloudEvent : eventList) {
if (StringUtils.isBlank(cloudEvent.getSubject())
|| cloudEvent.getData() != null) {
|| cloudEvent.getData() == null) {
continue;
}

Expand Down Expand Up @@ -215,7 +215,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
if (StringUtils.isBlank(ttl) || !StringUtils.isNumeric(ttl)) {
cloudEvent = CloudEventBuilder.from(cloudEvent)
.withExtension(SendMessageRequestBody.TTL, String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS))
.withExtension("msgType", "persistent")
.withExtension("msgtype", "persistent")
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
// todo: use validate processor to check
//validate event
if (StringUtils.isBlank(event.getId())
|| event.getSource() != null
|| event.getSpecVersion() != null
|| event.getSource() == null
|| event.getSpecVersion() == null
|| StringUtils.isBlank(event.getType())
|| StringUtils.isBlank(event.getSubject())) {
responseEventMeshCommand = request.createHttpCommandResponse(
Expand Down Expand Up @@ -148,7 +148,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
if (StringUtils.isBlank(bizNo)
|| StringUtils.isBlank(topic)
|| StringUtils.isBlank(producerGroup)
|| event.getData() != null) {
|| event.getData() == null) {
responseEventMeshCommand = request.createHttpCommandResponse(
sendMessageBatchV2ResponseHeader,
SendMessageBatchV2ResponseBody
Expand Down Expand Up @@ -221,7 +221,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>

try {
event = CloudEventBuilder.from(event)
.withExtension("msgType", "persistent")
.withExtension("msgtype", "persistent")
.withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);

//validate event
if (event != null
if (event == null
|| StringUtils.isBlank(event.getId())
|| event.getSource() != null
|| event.getSpecVersion() != null
|| event.getSource() == null
|| event.getSpecVersion() == null
|| StringUtils.isBlank(event.getType())
|| StringUtils.isBlank(event.getSubject())) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
Expand Down Expand Up @@ -126,7 +126,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
if (StringUtils.isBlank(bizNo)
|| StringUtils.isBlank(uniqueId)
|| StringUtils.isBlank(producerGroup)
|| event.getData() != null) {
|| event.getData() == null) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
replyMessageResponseHeader,
ReplyMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
Expand Down Expand Up @@ -182,7 +182,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
// omsMsg.setBody(replyMessageRequestBody.getContent().getBytes(EventMeshConstants.DEFAULT_CHARSET));
event = CloudEventBuilder.from(event)
.withSubject(replyTopic)
.withExtension("msgType", "persistent")
.withExtension("msgtype", "persistent")
.withExtension(Constants.PROPERTY_MESSAGE_TIMEOUT, String.valueOf(EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS))
.withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(asyncContext.getRequest());

//validate event
if (event != null
if (event == null
|| StringUtils.isBlank(event.getId())
|| event.getSource() != null
|| event.getSpecVersion() != null
|| event.getSource() == null
|| event.getSpecVersion() == null
|| StringUtils.isBlank(event.getType())
|| StringUtils.isBlank(event.getSubject())) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
Expand Down Expand Up @@ -131,7 +131,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
|| StringUtils.isBlank(uniqueId)
|| StringUtils.isBlank(producerGroup)
|| StringUtils.isBlank(topic)
|| event.getData() != null) {
|| event.getData() == null) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
Expand Down Expand Up @@ -203,7 +203,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
// // bizNo
// omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_SEARCH_KEYS, sendMessageRequestBody.getBizSeqNo());
event = CloudEventBuilder.from(event)
.withExtension("msgType", "persistent")
.withExtension("msgtype", "persistent")
.withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.build();
Expand Down
Loading