Skip to content

Commit

Permalink
[Feature apache#562] Implement EventMeshMessage protocol adaptor
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed Nov 18, 2021
1 parent d7ddac8 commit c534985
Show file tree
Hide file tree
Showing 12 changed files with 500 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2RequestBody;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
import org.apache.eventmesh.common.protocol.http.header.Header;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchV2RequestHeader;
Expand Down Expand Up @@ -41,33 +42,36 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan
if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
event = JsonUtils.deserialize(content, CloudEventV1.class);
event = CloudEventBuilder.from(event)
.withExtension("code", code)
.withExtension("env", env)
.withExtension("idc", idc)
.withExtension("ip", ip)
.withExtension("pid", pid)
.withExtension("sys", sys)
.withExtension("username", username)
.withExtension("passwd", passwd)
.withExtension("version", version.getVersion())
.withExtension("language", language)
.withExtension("protocolType", protocolType)
.withExtension("protocolDesc", protocolDesc)
.withExtension("protocolVersion", protocolVersion)
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
.withExtension(ProtocolKey.VERSION, version.getVersion())
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.build();
} else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) {
event = JsonUtils.deserialize(content, CloudEventV03.class);
event = CloudEventBuilder.from(event)
.withExtension("code", code)
.withExtension("env", env)
.withExtension("idc", idc)
.withExtension("ip", ip)
.withExtension("pid", pid)
.withExtension("sys", sys)
.withExtension("username", username)
.withExtension("passwd", passwd)
.withExtension("version", version.getVersion())
.withExtension("language", language)
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
.withExtension(ProtocolKey.VERSION, version.getVersion())
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.build();
}
return event;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageRequestBody;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
import org.apache.eventmesh.common.protocol.http.header.Header;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageRequestHeader;
Expand Down Expand Up @@ -42,33 +43,36 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan
if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
event = JsonUtils.deserialize(content, CloudEventV1.class);
event = CloudEventBuilder.from(event)
.withExtension("code", code)
.withExtension("env", env)
.withExtension("idc", idc)
.withExtension("ip", ip)
.withExtension("pid", pid)
.withExtension("sys", sys)
.withExtension("username", username)
.withExtension("passwd", passwd)
.withExtension("version", version.getVersion())
.withExtension("language", language)
.withExtension("protocolType", protocolType)
.withExtension("protocolDesc", protocolDesc)
.withExtension("protocolVersion", protocolVersion)
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
.withExtension(ProtocolKey.VERSION, version.getVersion())
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.build();
} else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) {
event = JsonUtils.deserialize(content, CloudEventV03.class);
event = CloudEventBuilder.from(event)
.withExtension("code", code)
.withExtension("env", env)
.withExtension("idc", idc)
.withExtension("ip", ip)
.withExtension("pid", pid)
.withExtension("sys", sys)
.withExtension("username", username)
.withExtension("passwd", passwd)
.withExtension("version", version.getVersion())
.withExtension("language", language)
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
.withExtension(ProtocolKey.VERSION, version.getVersion())
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.build();
}
return event;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
import org.apache.eventmesh.protocol.cloudevents.CloudEventsProtocolConstant;

public class TcpMessageProtocolResolver {

Expand All @@ -24,7 +25,7 @@ public static CloudEvent buildEvent(Header header, Object body) throws ProtocolH
protocolType, protocolVersion, protocolDesc));
}

if (!StringUtils.equals("cloudevents", protocolType)) {
if (!StringUtils.equals(CloudEventsProtocolConstant.PROTOCOL_NAME, protocolType)) {
throw new ProtocolHandleException(String.format("Unsupported protocolType: %s", protocolType));
}
if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.protocol.eventmeshmessage;

import io.cloudevents.CloudEvent;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
import org.apache.eventmesh.protocol.eventmeshmessage.resolver.http.SendMessageBatchProtocolResolver;
import org.apache.eventmesh.protocol.eventmeshmessage.resolver.http.SendMessageBatchV2ProtocolResolver;
import org.apache.eventmesh.protocol.eventmeshmessage.resolver.http.SendMessageRequestProtocolResolver;
import org.apache.eventmesh.protocol.eventmeshmessage.resolver.tcp.TcpMessageProtocolResolver;

import java.nio.charset.StandardCharsets;
import java.util.List;

public class EventMeshMessageProtocolAdaptor<T> implements ProtocolAdaptor<T> {

@Override
public CloudEvent toCloudEvent(T protocol) throws ProtocolHandleException {
if (protocol instanceof Package) {
Header header = ((Package) protocol).getHeader();
Object body = ((Package) protocol).getBody();

return deserializeTcpProtocol(header, body);

} else if (protocol instanceof HttpCommand) {
org.apache.eventmesh.common.protocol.http.header.Header header = ((HttpCommand) protocol).getHeader();
Body body = ((HttpCommand) protocol).getBody();
String requestCode = ((HttpCommand) protocol).getRequestCode();

return deserializeHttpProtocol(requestCode, header, body);
} else {
throw new ProtocolHandleException(String.format("protocol class: %s", protocol.getClass()));
}
}

private CloudEvent deserializeTcpProtocol(Header header, Object body) throws ProtocolHandleException {
return TcpMessageProtocolResolver.buildEvent(header, body);
}

private CloudEvent deserializeHttpProtocol(String requestCode, org.apache.eventmesh.common.protocol.http.header.Header header, Body body) throws ProtocolHandleException {

if (String.valueOf(RequestCode.MSG_BATCH_SEND.getRequestCode()).equals(requestCode)) {
return SendMessageBatchProtocolResolver.buildEvent(header, body);
} else if (String.valueOf(RequestCode.MSG_BATCH_SEND_V2.getRequestCode()).equals(requestCode)) {
return SendMessageBatchV2ProtocolResolver.buildEvent(header, body);
} else if (String.valueOf(RequestCode.MSG_SEND_SYNC.getRequestCode()).equals(requestCode)) {
return SendMessageRequestProtocolResolver.buildEvent(header, body);
} else if (String.valueOf(RequestCode.MSG_SEND_ASYNC.getRequestCode()).equals(requestCode)) {
return SendMessageRequestProtocolResolver.buildEvent(header, body);
} else {
throw new ProtocolHandleException(String.format("unsupported requestCode: %s", requestCode));
}

}

@Override
public List<CloudEvent> toBatchCloudEvent(T protocol) throws ProtocolHandleException {
return null;
}

@Override
public Object fromCloudEvent(CloudEvent cloudEvent) throws ProtocolHandleException {
String protocolDesc = cloudEvent.getExtension(Constants.PROTOCOL_DESC).toString();

if (StringUtils.equals("http", protocolDesc)) {
return new String(cloudEvent.getData().toBytes(), StandardCharsets.UTF_8);
} else if (StringUtils.equals("tcp", protocolDesc)) {
return TcpMessageProtocolResolver.buildEventMeshMessage(cloudEvent);
} else {
throw new ProtocolHandleException(String.format("Unsupported protocolDesc: %s", protocolDesc));
}
}

@Override
public String getProtocolType() {
return EventMeshMessageProtocolConstant.PROTOCOL_NAME;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.protocol.eventmeshmessage;

public enum EventMeshMessageProtocolConstant {
;
public static final String PROTOCOL_NAME = "eventmeshmessage";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.eventmesh.protocol.eventmeshmessage.resolver.http;

import io.cloudevents.CloudEvent;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.header.Header;

public class SendMessageBatchProtocolResolver {
public static CloudEvent buildEvent(Header header, Body body) {
return null;
}
}
Loading

0 comments on commit c534985

Please sign in to comment.