Skip to content

Commit

Permalink
update Eventmeshmessage plugin (#599)
Browse files Browse the repository at this point in the history
* [Feature #564] Support CloudEvents protocols for pub/sub in EventMesh-feature design

* support cloudevents api in eventmesh-connector-api module

* fix checkStyle

* fix checkStyle

* fix checkStyle

* 1.support LifeCycle.java
2.update Consumer and Producer

* fix remove the extra blank line

* support cloudEvents

* Add files via upload

* Update README.md

* support cloudEvents

* support cloudEvents

* [ISSUE #580] Add checkstyle gradle plugin (#581)

* Add checkstyle gradle plugin, change plugin package

* skip check in ci

* support cloudEvents

* support cloudevents

* update wechat-official qr code

* update mesh-helper qr code

* Add files via upload

* update README.md

* update README.md

* Update .asf.yaml

* support cloudEvents

* support cloudEvents

* [ISSUE #588] Fix typo in README.md (#589)

close #588

* support cloudEvents

* [Bug #590] Consumer subscription topic is invalid (#590) (#592)

* [Bug #590] Consumer subscription topic is invalid (#590)

* [Bug #590] Consumer subscription topic is invalid (#590)

close #590

* support cloudEvents adaptor

* [Feature #562] Implement CloudEvents adaptor

* [Feature #562] Implement EventMeshMessage protocol adaptor

* supplement apache header

* 1.update package name
2.support build.gradle and gradle.properties
3.support ProtocolTransportObject

Co-authored-by: Eason Chen <[email protected]>
Co-authored-by: Wenjun Ruan <[email protected]>
Co-authored-by: Nicholas Zhan <[email protected]>
Co-authored-by: hagsyn <[email protected]>
  • Loading branch information
5 people authored Nov 19, 2021
1 parent a876856 commit 902c4a8
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

dependencies {
compileOnly project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
implementation "io.cloudevents:cloudevents-core"

testImplementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
testImplementation "io.cloudevents:cloudevents-core"
testImplementation "junit:junit"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

pluginType=protocol
pluginName=eventmeshmessage
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,32 @@
* limitations under the License.
*/

package org.apache.eventmesh.protocol.eventmeshmessage;
package org.apache.eventmesh.protocol.meshmessage;

import io.cloudevents.CloudEvent;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.ProtocolTransportObject;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
import org.apache.eventmesh.protocol.eventmeshmessage.resolver.http.SendMessageBatchProtocolResolver;
import org.apache.eventmesh.protocol.eventmeshmessage.resolver.http.SendMessageBatchV2ProtocolResolver;
import org.apache.eventmesh.protocol.eventmeshmessage.resolver.http.SendMessageRequestProtocolResolver;
import org.apache.eventmesh.protocol.eventmeshmessage.resolver.tcp.TcpMessageProtocolResolver;
import org.apache.eventmesh.protocol.meshmessage.resolver.http.SendMessageBatchProtocolResolver;
import org.apache.eventmesh.protocol.meshmessage.resolver.http.SendMessageBatchV2ProtocolResolver;
import org.apache.eventmesh.protocol.meshmessage.resolver.http.SendMessageRequestProtocolResolver;
import org.apache.eventmesh.protocol.meshmessage.resolver.tcp.TcpMessageProtocolResolver;

import java.nio.charset.StandardCharsets;
import java.util.List;

public class EventMeshMessageProtocolAdaptor<T> implements ProtocolAdaptor<T> {
public class MeshMessageProtocolAdaptor<T extends ProtocolTransportObject>
implements ProtocolAdaptor<ProtocolTransportObject> {

@Override
public CloudEvent toCloudEvent(T protocol) throws ProtocolHandleException {
public CloudEvent toCloudEvent(ProtocolTransportObject protocol) throws ProtocolHandleException {
if (protocol instanceof Package) {
Header header = ((Package) protocol).getHeader();
Object body = ((Package) protocol).getBody();
Expand Down Expand Up @@ -78,16 +79,18 @@ private CloudEvent deserializeHttpProtocol(String requestCode, org.apache.eventm
}

@Override
public List<CloudEvent> toBatchCloudEvent(T protocol) throws ProtocolHandleException {
public List<CloudEvent> toBatchCloudEvent(ProtocolTransportObject protocol) throws ProtocolHandleException {
return null;
}

@Override
public Object fromCloudEvent(CloudEvent cloudEvent) throws ProtocolHandleException {
public ProtocolTransportObject fromCloudEvent(CloudEvent cloudEvent) throws ProtocolHandleException {
String protocolDesc = cloudEvent.getExtension(Constants.PROTOCOL_DESC).toString();

if (StringUtils.equals("http", protocolDesc)) {
return new String(cloudEvent.getData().toBytes(), StandardCharsets.UTF_8);
// todo: return command, set cloudEvent.getData() to content?
return null;
// return new String(cloudEvent.getData().toBytes(), StandardCharsets.UTF_8);
} else if (StringUtils.equals("tcp", protocolDesc)) {
return TcpMessageProtocolResolver.buildEventMeshMessage(cloudEvent);
} else {
Expand All @@ -97,6 +100,6 @@ public Object fromCloudEvent(CloudEvent cloudEvent) throws ProtocolHandleExcepti

@Override
public String getProtocolType() {
return EventMeshMessageProtocolConstant.PROTOCOL_NAME;
return MeshMessageProtocolConstant.PROTOCOL_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package org.apache.eventmesh.protocol.eventmeshmessage;
package org.apache.eventmesh.protocol.meshmessage;

public enum EventMeshMessageProtocolConstant {
public enum MeshMessageProtocolConstant {
;
public static final String PROTOCOL_NAME = "eventmeshmessage";
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.protocol.eventmeshmessage.resolver.http;
package org.apache.eventmesh.protocol.meshmessage.resolver.http;

import io.cloudevents.CloudEvent;
import org.apache.eventmesh.common.protocol.http.body.Body;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,18 @@
* limitations under the License.
*/

package org.apache.eventmesh.protocol.eventmeshmessage.resolver.http;
package org.apache.eventmesh.protocol.meshmessage.resolver.http;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.v03.CloudEventV03;
import io.cloudevents.core.v1.CloudEventV1;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2RequestBody;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
import org.apache.eventmesh.common.protocol.http.header.Header;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchV2RequestHeader;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;

import java.nio.charset.StandardCharsets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
* limitations under the License.
*/

package org.apache.eventmesh.protocol.eventmeshmessage.resolver.http;
package org.apache.eventmesh.protocol.meshmessage.resolver.http;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.v03.CloudEventV03;
import io.cloudevents.core.v1.CloudEventV1;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2RequestBody;
Expand All @@ -30,7 +28,6 @@
import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
import org.apache.eventmesh.common.protocol.http.header.Header;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageRequestHeader;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;

import java.nio.charset.StandardCharsets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.protocol.eventmeshmessage.resolver.tcp;
package org.apache.eventmesh.protocol.meshmessage.resolver.tcp;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
Expand All @@ -26,7 +26,7 @@
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
import org.apache.eventmesh.protocol.eventmeshmessage.EventMeshMessageProtocolConstant;
import org.apache.eventmesh.protocol.meshmessage.MeshMessageProtocolConstant;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
Expand All @@ -50,7 +50,7 @@ public static CloudEvent buildEvent(Header header, Object body) throws ProtocolH
protocolType, protocolVersion, protocolDesc));
}

if (!StringUtils.equals(EventMeshMessageProtocolConstant.PROTOCOL_NAME, protocolType)) {
if (!StringUtils.equals(MeshMessageProtocolConstant.PROTOCOL_NAME, protocolType)) {
throw new ProtocolHandleException(String.format("Unsupported protocolType: %s", protocolType));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

eventmeshmessage=org.apache.eventmesh.protocol.eventmeshmessage.EventMeshMessageProtocolAdaptor
eventmeshmessage=org.apache.eventmesh.protocol.meshmessage.MeshMessageProtocolAdaptor
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ include 'eventmesh-protocol-plugin'
include 'eventmesh-protocol-plugin:eventmesh-protocol-api'
include 'eventmesh-protocol-plugin:eventmesh-protocol-openmessage'
include 'eventmesh-protocol-plugin:eventmesh-protocol-cloudevents'
include 'eventmesh-protocol-plugin:eventmesh-protocol-eventmeshmessage'
include 'eventmesh-protocol-plugin:eventmesh-protocol-meshmessage'

0 comments on commit 902c4a8

Please sign in to comment.