Skip to content

Commit

Permalink
cloudevents protocol tcp pub/sub for sdk (apache#622)
Browse files Browse the repository at this point in the history
1.cloudevents protocol tcp pub/sub for sdk
  • Loading branch information
xwm1992 committed Dec 27, 2021
1 parent 0fcb401 commit 08297fc
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 25 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ subprojects {
dependency "org.powermock:powermock-api-mockito2:2.0.2"

dependency "io.cloudevents:cloudevents-core:2.2.0"
dependency "io.cloudevents:cloudevents-json-jackson:2.2.0"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,13 @@ public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws E
final String bodyJson = pkg != null ? OBJECT_MAPPER.writeValueAsString(pkg.getBody()) : null;

final byte[] headerData = serializeBytes(headerJson);
final byte[] bodyData = serializeBytes(bodyJson);
// final byte[] bodyData = serializeBytes(bodyJson);

byte[] bodyData = serializeBytes(bodyJson);

if (headerJson.contains("cloudevents")) {
bodyData = (byte[]) pkg.getBody();
}

if (log.isDebugEnabled()) {
log.debug("Encoder headerJson={}|bodyJson={}", headerJson, bodyJson);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

Expand Down Expand Up @@ -165,12 +167,16 @@ private static String generateRandomString(int length) {
}

public static CloudEvent generateCloudEventV1() {
Map<String, String> content = new HashMap<>();
content.put("content", "testAsyncMessage");

CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject(TOPIC_PRX_WQ2ClientBroadCast)
.withSubject(TOPIC_PRX_WQ2ClientUniCast)
.withSource(URI.create("/"))
.withDataContentType("application/cloudevents+json")
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData("testAsyncMessage".getBytes(StandardCharsets.UTF_8))
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension("ttl", "30000")
.build();
return event;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
dependencies {
compileOnly project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
implementation "io.cloudevents:cloudevents-core"
implementation "io.cloudevents:cloudevents-json-jackson"

testImplementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
testImplementation "io.cloudevents:cloudevents-core"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.eventmesh.protocol.cloudevents;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.provider.EventFormatProvider;

import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
Expand Down Expand Up @@ -111,7 +113,9 @@ public Map<String, Object> toMap() {
return httpCommand;
} else if (StringUtils.equals("tcp", protocolDesc)) {
Package pkg = new Package();
pkg.setBody(cloudEvent);
byte[] bodyByte = EventFormatProvider.getInstance().resolveFormat(cloudEvent.getDataContentType())
.serialize(cloudEvent);
pkg.setBody(bodyByte);
return pkg;
} else {
throw new ProtocolHandleException(String.format("Unsupported protocolDesc: %s", protocolDesc));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.protocol.cloudevents.resolver.http;

import io.cloudevents.CloudEvent;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.protocol.cloudevents.resolver.http;

import io.cloudevents.CloudEvent;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.protocol.cloudevents.resolver.http;

import io.cloudevents.CloudEvent;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.protocol.cloudevents.resolver.tcp;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
import org.apache.eventmesh.protocol.cloudevents.CloudEventsProtocolConstant;

import org.apache.commons.lang3.StringUtils;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;

import java.nio.charset.StandardCharsets;

import com.fasterxml.jackson.core.JsonProcessingException;

public class TcpMessageProtocolResolver {

public static CloudEvent buildEvent(Header header, String cloudEventJson) throws ProtocolHandleException {
public static CloudEvent buildEvent(Header header, String cloudEventJson)
throws ProtocolHandleException {
CloudEventBuilder cloudEventBuilder;

String protocolType = header.getProperty(Constants.PROTOCOL_TYPE).toString();
Expand All @@ -28,19 +55,21 @@ public static CloudEvent buildEvent(Header header, String cloudEventJson) throws
if (!StringUtils.equals(CloudEventsProtocolConstant.PROTOCOL_NAME, protocolType)) {
throw new ProtocolHandleException(String.format("Unsupported protocolType: %s", protocolType));
}
if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
// todo: transform cloudEventJson to cloudEvent
cloudEventBuilder = CloudEventBuilder.v1(null);

if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
CloudEvent event = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE).deserialize(cloudEventJson.getBytes(
StandardCharsets.UTF_8));
cloudEventBuilder = CloudEventBuilder.v1(event);
for (String propKey : header.getProperties().keySet()) {
cloudEventBuilder.withExtension(propKey, header.getProperty(propKey).toString());
}

return cloudEventBuilder.build();

} else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) {
// todo: transform cloudEventJson to cloudEvent
cloudEventBuilder = CloudEventBuilder.v03(null);
CloudEvent event = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE).deserialize(cloudEventJson.getBytes(
StandardCharsets.UTF_8));
cloudEventBuilder = CloudEventBuilder.v03(event);

for (String propKey : header.getProperties().keySet()) {
cloudEventBuilder.withExtension(propKey, header.getProperty(propKey).toString());
Expand Down
1 change: 1 addition & 0 deletions eventmesh-sdk-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {

// protocol
implementation "io.cloudevents:cloudevents-core"
implementation "io.cloudevents:cloudevents-json-jackson"
implementation "io.openmessaging:openmessaging-api"

compileOnly 'org.projectlombok:lombok:1.18.22'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.Subscription;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.utils.JsonUtils;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;
import io.openmessaging.api.Message;

public class MessageUtils {
Expand Down Expand Up @@ -91,9 +95,15 @@ public static Package buildPackage(Object message, Command command) {
if (message instanceof CloudEvent) {
msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME);
msg.getHeader().putProperty(Constants.PROTOCOL_VERSION, ((CloudEvent) message).getSpecVersion().toString());
msg.getHeader().putProperty(Constants.PROTOCOL_DESC, "tcp");
byte[] bodyByte = EventFormatProvider.getInstance().resolveFormat(((CloudEvent) message).getDataContentType())
.serialize((CloudEvent) message);
msg.setBody(bodyByte);
} else if (message instanceof EventMeshMessage) {
msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, EventMeshCommon.EM_MESSAGE_PROTOCOL_NAME);
msg.getHeader().putProperty(Constants.PROTOCOL_VERSION, SpecVersion.V1.toString());
msg.getHeader().putProperty(Constants.PROTOCOL_DESC, "tcp");
msg.setBody(message);
} else if (message instanceof Message) {
msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, EventMeshCommon.OPEN_MESSAGE_PROTOCOL_NAME);
// todo: this version need to be confirmed.
Expand All @@ -102,8 +112,7 @@ public static Package buildPackage(Object message, Command command) {
// unsupported protocol for server
throw new IllegalArgumentException("Unsupported message protocol");
}
msg.getHeader().putProperty(Constants.PROTOCOL_DESC, "tcp");
msg.setBody(message);

return msg;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.eventmesh.client.tcp.common.RequestContext;
import org.apache.eventmesh.client.tcp.common.TcpClient;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
Expand All @@ -37,12 +38,15 @@

import org.apache.commons.collections4.CollectionUtils;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -154,22 +158,29 @@ private class Handler extends SimpleChannelInboundHandler<Package> {
protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Exception {
Command cmd = msg.getHeader().getCmd();
log.info("|receive|type={}|msg={}", cmd, msg);
String protocolVersion = msg.getHeader().getProperty(Constants.PROTOCOL_VERSION).toString();
if (cmd == Command.REQUEST_TO_CLIENT) {
Package pkg = requestToClientAck(msg);
if (callback != null) {
callback.handle((CloudEvent) pkg.getBody(), ctx);
CloudEvent event = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE)
.deserialize(msg.getBody().toString().getBytes(StandardCharsets.UTF_8));
callback.handle(event, ctx);
}
send(pkg);
} else if (cmd == Command.ASYNC_MESSAGE_TO_CLIENT) {
Package pkg = asyncMessageAck(msg);
if (callback != null) {
callback.handle((CloudEvent) msg, ctx);
CloudEvent event = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE)
.deserialize(msg.getBody().toString().getBytes(StandardCharsets.UTF_8));
callback.handle(event, ctx);
}
send(pkg);
} else if (cmd == Command.BROADCAST_MESSAGE_TO_CLIENT) {
Package pkg = broadcastMessageAck(msg);
if (callback != null) {
callback.handle((CloudEvent) msg, ctx);
CloudEvent event = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE)
.deserialize(msg.getBody().toString().getBytes(StandardCharsets.UTF_8));
callback.handle(event, ctx);
}
send(pkg);
} else if (cmd == Command.SERVER_GOODBYE_REQUEST) {
Expand All @@ -190,24 +201,21 @@ protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Excep
private Package requestToClientAck(Package tcpPackage) {
Package msg = new Package();
msg.setHeader(new Header(Command.REQUEST_TO_CLIENT_ACK, 0, null, tcpPackage.getHeader().getSeq()));
// todo: Transform json to CloudEvents
msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), EventMeshMessage.class));
msg.setBody(tcpPackage.getBody());
return msg;
}

private Package asyncMessageAck(Package tcpPackage) {
Package msg = new Package();
msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_CLIENT_ACK, 0, null, tcpPackage.getHeader().getSeq()));
// todo: Transform to CloudEvents
msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), EventMeshMessage.class));
msg.setBody(tcpPackage.getBody());
return msg;
}

private Package broadcastMessageAck(Package tcpPackage) {
Package msg = new Package();
msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, 0, null, tcpPackage.getHeader().getSeq()));
// todo: Transform to CloudEvents
msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), EventMeshMessage.class));
msg.setBody(tcpPackage.getBody());
return msg;
}

Expand Down

0 comments on commit 08297fc

Please sign in to comment.