From 08297fcb4646e77e7b1e3336164389b7a5faba5c Mon Sep 17 00:00:00 2001 From: mike_xwm Date: Wed, 1 Dec 2021 21:40:08 +0800 Subject: [PATCH] cloudevents protocol tcp pub/sub for sdk (#622) 1.cloudevents protocol tcp pub/sub for sdk --- build.gradle | 1 + .../common/protocol/tcp/codec/Codec.java | 8 ++- .../tcp/common/EventMeshTestUtils.java | 10 +++- .../build.gradle | 1 + .../CloudEventsProtocolAdaptor.java | 6 ++- .../SendMessageBatchProtocolResolver.java | 17 +++++++ .../SendMessageBatchV2ProtocolResolver.java | 17 +++++++ .../SendMessageRequestProtocolResolver.java | 17 +++++++ .../tcp/TcpMessageProtocolResolver.java | 49 +++++++++++++++---- eventmesh-sdk-java/build.gradle | 1 + .../client/tcp/common/MessageUtils.java | 13 ++++- .../cloudevent/CloudEventTCPSubClient.java | 26 ++++++---- 12 files changed, 141 insertions(+), 25 deletions(-) diff --git a/build.gradle b/build.gradle index 95b3e5c319..1485d4e411 100644 --- a/build.gradle +++ b/build.gradle @@ -451,6 +451,7 @@ subprojects { dependency "org.powermock:powermock-api-mockito2:2.0.2" dependency "io.cloudevents:cloudevents-core:2.2.0" + dependency "io.cloudevents:cloudevents-json-jackson:2.2.0" } } } \ No newline at end of file diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java index 191a09961d..9570100c98 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java @@ -76,7 +76,13 @@ public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws E final String bodyJson = pkg != null ? OBJECT_MAPPER.writeValueAsString(pkg.getBody()) : null; final byte[] headerData = serializeBytes(headerJson); - final byte[] bodyData = serializeBytes(bodyJson); +// final byte[] bodyData = serializeBytes(bodyJson); + + byte[] bodyData = serializeBytes(bodyJson); + + if (headerJson.contains("cloudevents")) { + bodyData = (byte[]) pkg.getBody(); + } if (log.isDebugEnabled()) { log.debug("Encoder headerJson={}|bodyJson={}", headerJson, bodyJson); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java index 43598324a8..894448275b 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java @@ -36,6 +36,8 @@ import java.net.URI; import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; @@ -165,12 +167,16 @@ private static String generateRandomString(int length) { } public static CloudEvent generateCloudEventV1() { + Map content = new HashMap<>(); + content.put("content", "testAsyncMessage"); + CloudEvent event = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) - .withSubject(TOPIC_PRX_WQ2ClientBroadCast) + .withSubject(TOPIC_PRX_WQ2ClientUniCast) .withSource(URI.create("/")) + .withDataContentType("application/cloudevents+json") .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME) - .withData("testAsyncMessage".getBytes(StandardCharsets.UTF_8)) + .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8)) .withExtension("ttl", "30000") .build(); return event; diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle index 4a82353e6e..cf2b974f53 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle @@ -18,6 +18,7 @@ dependencies { compileOnly project(":eventmesh-protocol-plugin:eventmesh-protocol-api") implementation "io.cloudevents:cloudevents-core" + implementation "io.cloudevents:cloudevents-json-jackson" testImplementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api") testImplementation "io.cloudevents:cloudevents-core" diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java index b670ef9084..6e540e6cdd 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java @@ -18,6 +18,8 @@ package org.apache.eventmesh.protocol.cloudevents; import io.cloudevents.CloudEvent; +import io.cloudevents.core.provider.EventFormatProvider; + import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.ProtocolTransportObject; @@ -111,7 +113,9 @@ public Map toMap() { return httpCommand; } else if (StringUtils.equals("tcp", protocolDesc)) { Package pkg = new Package(); - pkg.setBody(cloudEvent); + byte[] bodyByte = EventFormatProvider.getInstance().resolveFormat(cloudEvent.getDataContentType()) + .serialize(cloudEvent); + pkg.setBody(bodyByte); return pkg; } else { throw new ProtocolHandleException(String.format("Unsupported protocolDesc: %s", protocolDesc)); diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchProtocolResolver.java index 6ea8d2695f..2b4f180038 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchProtocolResolver.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.eventmesh.protocol.cloudevents.resolver.http; import io.cloudevents.CloudEvent; diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java index 2c33aec9bf..a863bb5924 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.eventmesh.protocol.cloudevents.resolver.http; import io.cloudevents.CloudEvent; diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageRequestProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageRequestProtocolResolver.java index 7ef619a66e..6c80dd7cb3 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageRequestProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageRequestProtocolResolver.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.eventmesh.protocol.cloudevents.resolver.http; import io.cloudevents.CloudEvent; diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java index 0e2b303562..bec9b8da44 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java @@ -1,17 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.eventmesh.protocol.cloudevents.resolver.tcp; -import io.cloudevents.CloudEvent; -import io.cloudevents.SpecVersion; -import io.cloudevents.core.builder.CloudEventBuilder; -import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.tcp.Header; +import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; import org.apache.eventmesh.protocol.cloudevents.CloudEventsProtocolConstant; +import org.apache.commons.lang3.StringUtils; + +import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; + +import java.nio.charset.StandardCharsets; + +import com.fasterxml.jackson.core.JsonProcessingException; + public class TcpMessageProtocolResolver { - public static CloudEvent buildEvent(Header header, String cloudEventJson) throws ProtocolHandleException { + public static CloudEvent buildEvent(Header header, String cloudEventJson) + throws ProtocolHandleException { CloudEventBuilder cloudEventBuilder; String protocolType = header.getProperty(Constants.PROTOCOL_TYPE).toString(); @@ -28,10 +55,11 @@ public static CloudEvent buildEvent(Header header, String cloudEventJson) throws if (!StringUtils.equals(CloudEventsProtocolConstant.PROTOCOL_NAME, protocolType)) { throw new ProtocolHandleException(String.format("Unsupported protocolType: %s", protocolType)); } - if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) { - // todo: transform cloudEventJson to cloudEvent - cloudEventBuilder = CloudEventBuilder.v1(null); + if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) { + CloudEvent event = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE).deserialize(cloudEventJson.getBytes( + StandardCharsets.UTF_8)); + cloudEventBuilder = CloudEventBuilder.v1(event); for (String propKey : header.getProperties().keySet()) { cloudEventBuilder.withExtension(propKey, header.getProperty(propKey).toString()); } @@ -39,8 +67,9 @@ public static CloudEvent buildEvent(Header header, String cloudEventJson) throws return cloudEventBuilder.build(); } else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) { - // todo: transform cloudEventJson to cloudEvent - cloudEventBuilder = CloudEventBuilder.v03(null); + CloudEvent event = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE).deserialize(cloudEventJson.getBytes( + StandardCharsets.UTF_8)); + cloudEventBuilder = CloudEventBuilder.v03(event); for (String propKey : header.getProperties().keySet()) { cloudEventBuilder.withExtension(propKey, header.getProperty(propKey).toString()); diff --git a/eventmesh-sdk-java/build.gradle b/eventmesh-sdk-java/build.gradle index 108e418652..a60dc141b8 100644 --- a/eventmesh-sdk-java/build.gradle +++ b/eventmesh-sdk-java/build.gradle @@ -27,6 +27,7 @@ dependencies { // protocol implementation "io.cloudevents:cloudevents-core" + implementation "io.cloudevents:cloudevents-json-jackson" implementation "io.openmessaging:openmessaging-api" compileOnly 'org.projectlombok:lombok:1.18.22' diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java index 16cbeef753..d1020d435f 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java @@ -27,13 +27,17 @@ import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.tcp.Subscription; import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import org.apache.eventmesh.common.utils.JsonUtils; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadLocalRandom; import io.cloudevents.CloudEvent; import io.cloudevents.SpecVersion; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; import io.openmessaging.api.Message; public class MessageUtils { @@ -91,9 +95,15 @@ public static Package buildPackage(Object message, Command command) { if (message instanceof CloudEvent) { msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME); msg.getHeader().putProperty(Constants.PROTOCOL_VERSION, ((CloudEvent) message).getSpecVersion().toString()); + msg.getHeader().putProperty(Constants.PROTOCOL_DESC, "tcp"); + byte[] bodyByte = EventFormatProvider.getInstance().resolveFormat(((CloudEvent) message).getDataContentType()) + .serialize((CloudEvent) message); + msg.setBody(bodyByte); } else if (message instanceof EventMeshMessage) { msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, EventMeshCommon.EM_MESSAGE_PROTOCOL_NAME); msg.getHeader().putProperty(Constants.PROTOCOL_VERSION, SpecVersion.V1.toString()); + msg.getHeader().putProperty(Constants.PROTOCOL_DESC, "tcp"); + msg.setBody(message); } else if (message instanceof Message) { msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, EventMeshCommon.OPEN_MESSAGE_PROTOCOL_NAME); // todo: this version need to be confirmed. @@ -102,8 +112,7 @@ public static Package buildPackage(Object message, Command command) { // unsupported protocol for server throw new IllegalArgumentException("Unsupported message protocol"); } - msg.getHeader().putProperty(Constants.PROTOCOL_DESC, "tcp"); - msg.setBody(message); + return msg; } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java index 9ea509be3c..1f1f7a09e8 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java @@ -24,6 +24,7 @@ import org.apache.eventmesh.client.tcp.common.RequestContext; import org.apache.eventmesh.client.tcp.common.TcpClient; import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; +import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.SubscriptionMode; @@ -37,12 +38,15 @@ import org.apache.commons.collections4.CollectionUtils; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; @@ -154,22 +158,29 @@ private class Handler extends SimpleChannelInboundHandler { protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Exception { Command cmd = msg.getHeader().getCmd(); log.info("|receive|type={}|msg={}", cmd, msg); + String protocolVersion = msg.getHeader().getProperty(Constants.PROTOCOL_VERSION).toString(); if (cmd == Command.REQUEST_TO_CLIENT) { Package pkg = requestToClientAck(msg); if (callback != null) { - callback.handle((CloudEvent) pkg.getBody(), ctx); + CloudEvent event = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE) + .deserialize(msg.getBody().toString().getBytes(StandardCharsets.UTF_8)); + callback.handle(event, ctx); } send(pkg); } else if (cmd == Command.ASYNC_MESSAGE_TO_CLIENT) { Package pkg = asyncMessageAck(msg); if (callback != null) { - callback.handle((CloudEvent) msg, ctx); + CloudEvent event = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE) + .deserialize(msg.getBody().toString().getBytes(StandardCharsets.UTF_8)); + callback.handle(event, ctx); } send(pkg); } else if (cmd == Command.BROADCAST_MESSAGE_TO_CLIENT) { Package pkg = broadcastMessageAck(msg); if (callback != null) { - callback.handle((CloudEvent) msg, ctx); + CloudEvent event = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE) + .deserialize(msg.getBody().toString().getBytes(StandardCharsets.UTF_8)); + callback.handle(event, ctx); } send(pkg); } else if (cmd == Command.SERVER_GOODBYE_REQUEST) { @@ -190,24 +201,21 @@ protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Excep private Package requestToClientAck(Package tcpPackage) { Package msg = new Package(); msg.setHeader(new Header(Command.REQUEST_TO_CLIENT_ACK, 0, null, tcpPackage.getHeader().getSeq())); - // todo: Transform json to CloudEvents - msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), EventMeshMessage.class)); + msg.setBody(tcpPackage.getBody()); return msg; } private Package asyncMessageAck(Package tcpPackage) { Package msg = new Package(); msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_CLIENT_ACK, 0, null, tcpPackage.getHeader().getSeq())); - // todo: Transform to CloudEvents - msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), EventMeshMessage.class)); + msg.setBody(tcpPackage.getBody()); return msg; } private Package broadcastMessageAck(Package tcpPackage) { Package msg = new Package(); msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, 0, null, tcpPackage.getHeader().getSeq())); - // todo: Transform to CloudEvents - msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), EventMeshMessage.class)); + msg.setBody(tcpPackage.getBody()); return msg; }