Skip to content

Commit

Permalink
[Feature #562] Support CloudEvents for pub/sub in EventMesh runtime (#…
Browse files Browse the repository at this point in the history
…587)

* [Feature #564] Support CloudEvents protocols for pub/sub in EventMesh-feature design

* support cloudevents api in eventmesh-connector-api module

* fix checkStyle

* fix checkStyle

* fix checkStyle

* 1.support LifeCycle.java
2.update Consumer and Producer

* fix remove the extra blank line

* support cloudEvents

* support cloudEvents

* support cloudEvents

* support cloudEvents

* support cloudevents

* support cloudEvents

* support cloudEvents

* support cloudEvents
  • Loading branch information
xwm1992 authored Nov 16, 2021
1 parent 3459ee4 commit d3a36b5
Show file tree
Hide file tree
Showing 38 changed files with 871 additions and 749 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ public class Constants {

public static final String HTTPS_PROTOCOL_PREFIX = "https://";

public static final String PROTOCOL_TYPE = "protocol_type";

public static final String PROTOCOL_VERSION = "protocol_version";

public static final String PROTOCOL_DESC = "protocol_desc";

public static final int DEFAULT_HTTP_TIME_OUT = 3000;

public static final String EVENTMESH_MESSAGE_CONST_TTL = "ttl";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public static class ReplyMessage {
public String body;
public Map<String, String> properties;

public ReplyMessage(String topic, String body){
this.topic = topic;
this.body = body;
}

public ReplyMessage(String topic, String body, Map<String, String> properties) {
this.topic = topic;
this.body = body;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@ public class Header {

private Command cmd;
private int code;
private String dsec;
private String desc;
private String seq;
private Map<String,Object> properties;
private Map<String, Object> properties;

public Header() {
}

public Header(Command cmd, int code, String dsec, String seq) {
public Header(Command cmd, int code, String desc, String seq) {
this.cmd = cmd;
this.code = code;
this.dsec = dsec;
this.desc = desc;
this.seq = seq;
}

public Header(int code, String dsec, String seq, Map<String, Object> properties) {
public Header(int code, String desc, String seq, Map<String, Object> properties) {
this.code = code;
this.dsec = dsec;
this.desc = desc;
this.seq = seq;
this.properties = properties;
}
Expand All @@ -61,12 +61,12 @@ public void setCode(int code) {
this.code = code;
}

public String getDsec() {
return dsec;
public String getDesc() {
return desc;
}

public void setDsec(String dsec) {
this.dsec = dsec;
public void setDesc(String desc) {
this.desc = desc;
}

public String getSeq() {
Expand Down Expand Up @@ -106,7 +106,7 @@ public String toString() {
return "Header{" +
"cmd=" + cmd +
", code=" + code +
", dsec='" + dsec + '\'' +
", desc='" + desc + '\'' +
", seq='" + seq + '\'' +
", properties=" + properties +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.eventmesh.api;

import io.openmessaging.api.Action;
import io.openmessaging.api.AsyncConsumeContext;

public abstract class EventMeshAsyncConsumeContext extends AsyncConsumeContext {

Expand All @@ -34,8 +32,4 @@ public void setAbstractContext(AbstractContext abstractContext) {

public abstract void commit(EventMeshAction action);

@Override
public void commit(Action action) {
throw new UnsupportedOperationException("not support yet");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.apache.eventmesh.api;

import io.cloudevents.CloudEvent;

public interface RequestReplyCallback {

void onSuccess(CloudEvent event);

void onException(Throwable e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

package org.apache.eventmesh.api.factory;

import org.apache.eventmesh.api.consumer.MeshMQPushConsumer;
import org.apache.eventmesh.api.producer.MeshMQProducer;
import org.apache.eventmesh.api.consumer.Consumer;
import org.apache.eventmesh.api.producer.Producer;
import org.apache.eventmesh.spi.EventMeshExtensionFactory;

/**
* The factory to get connector {@link MeshMQProducer} and {@link MeshMQPushConsumer}
* The factory to get connector {@link Producer} and {@link Consumer}
*/
public class ConnectorPluginFactory {

Expand All @@ -34,8 +34,8 @@ public class ConnectorPluginFactory {
* @param connectorPluginName plugin name
* @return MeshMQProducer instance
*/
public static MeshMQProducer getMeshMQProducer(String connectorPluginName) {
return EventMeshExtensionFactory.getExtension(MeshMQProducer.class, connectorPluginName);
public static Producer getMeshMQProducer(String connectorPluginName) {
return EventMeshExtensionFactory.getExtension(Producer.class, connectorPluginName);
}

/**
Expand All @@ -44,8 +44,8 @@ public static MeshMQProducer getMeshMQProducer(String connectorPluginName) {
* @param connectorPluginName plugin name
* @return MeshMQPushConsumer instance
*/
public static MeshMQPushConsumer getMeshMQPushConsumer(String connectorPluginName) {
return EventMeshExtensionFactory.getExtension(MeshMQPushConsumer.class, connectorPluginName);
public static Consumer getMeshMQPushConsumer(String connectorPluginName) {
return EventMeshExtensionFactory.getExtension(Consumer.class, connectorPluginName);
}

private static <T> T getPlugin(Class<T> pluginType, String pluginName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.eventmesh.api.producer;

import org.apache.eventmesh.api.LifeCycle;
import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.*;
import org.apache.eventmesh.spi.EventMeshExtensionType;
import org.apache.eventmesh.spi.EventMeshSPI;

Expand All @@ -46,6 +43,8 @@ public interface Producer extends LifeCycle {

void request(CloudEvent cloudEvent, RRCallback rrCallback, long timeout) throws Exception;

void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception;

boolean reply(final CloudEvent cloudEvent, final SendCallback sendCallback) throws Exception;

void checkTopicExist(String topic) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.eventmesh.connector.standalone.broker.task;

import io.openmessaging.api.AsyncMessageListener;
import io.openmessaging.api.Message;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.connector.standalone.broker.StandaloneBroker;
Expand All @@ -33,7 +32,7 @@ public class SubScribeTask implements Runnable {

private String topicName;
private StandaloneBroker standaloneBroker;
private AsyncMessageListener listener;
private EventListener listener;
private volatile boolean isRunning;

private AtomicInteger offset;
Expand All @@ -42,7 +41,7 @@ public class SubScribeTask implements Runnable {

public SubScribeTask(String topicName,
StandaloneBroker standaloneBroker,
AsyncMessageListener listener) {
EventListener listener) {
this.topicName = topicName;
this.standaloneBroker = standaloneBroker;
this.listener = listener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

dependencies {
implementation project(":eventmesh-spi")
implementation project(":eventmesh-common")
api project(":eventmesh-common")

implementation "io.cloudevents:cloudevents-core"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.eventmesh.protocol.api;

import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
import org.apache.eventmesh.spi.EventMeshExtensionType;
import org.apache.eventmesh.spi.EventMeshSPI;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.v1.CloudEventV1;

import java.util.List;

/**
* Protocol transformer SPI interface, all protocol plugin should implementation.
*
Expand All @@ -32,24 +35,31 @@
* @since 1.3.0
*/
@EventMeshSPI(isSingleton = true, eventMeshExtensionType = EventMeshExtensionType.PROTOCOL)
public interface ProtocolAdaptor {
public interface ProtocolAdaptor<T> {

/**
* transform protocol to {@link CloudEvent}.
*
* @param protocol input protocol
* @return cloud event
*/
CloudEventV1 toCloudEventV1(Package protocol) throws ProtocolHandleException;
CloudEvent toCloudEvent(T protocol) throws ProtocolHandleException;

/**
* transform protocol to {@link CloudEvent} list.
*
* @param protocol input protocol
* @return list cloud event
*/
List<CloudEvent> toBatchCloudEvent(T protocol) throws ProtocolHandleException;

/**
* Transform {@link CloudEvent} to target protocol.
*
* @param cloudEvent clout event
* @return target protocol
*/
Package fromCloudEventV1(CloudEventV1 cloudEvent) throws ProtocolHandleException;
T fromCloudEvent(CloudEvent cloudEvent) throws ProtocolHandleException;

/**
* Get protocol type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,40 @@

package org.apache.eventmesh.protocol.cloudevents;

import io.cloudevents.CloudEvent;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;

import io.cloudevents.core.v1.CloudEventV1;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;

import java.util.List;

/**
* CloudEvents protocol adaptor, used to transform CloudEvents message to CloudEvents message.
*
* @since 1.3.0
*/
public class CloudEventsProtocolAdaptor implements ProtocolAdaptor {
public class CloudEventsProtocolAdaptor<T> implements ProtocolAdaptor<T> {

@Override
public CloudEvent toCloudEvent(T cloudEvent) {

if (cloudEvent instanceof Package){
//todo:convert package to cloudevents
}else if (cloudEvent instanceof HttpCommand){
//todo:convert httpCommand to cloudevents
}
return null;
}

@Override
public CloudEventV1 toCloudEventV1(Package cloudEvent) {
public List<CloudEvent> toBatchCloudEvent(T protocol) throws ProtocolHandleException {
return null;
}

@Override
public Package fromCloudEventV1(CloudEventV1 cloudEvent) {
public T fromCloudEvent(CloudEvent cloudEvent) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,36 @@

package org.apache.eventmesh.protocol.openmessage;

import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.v1.CloudEventV1;
import io.openmessaging.api.Message;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;

import java.util.List;

/**
* OpenMessage protocol adaptor, used to transform protocol between
* {@link CloudEvent} with {@link Message}.
*
* @since 1.3.0
*/
public class OpenMessageProtocolAdaptor implements ProtocolAdaptor {
public class OpenMessageProtocolAdaptor<T> implements ProtocolAdaptor<T> {

@Override
public CloudEvent toCloudEvent(T message) {
return null;
}

@Override
public CloudEventV1 toCloudEventV1(Package message) {
public List<CloudEvent> toBatchCloudEvent(T protocol) throws ProtocolHandleException {
return null;
}

@Override
public Package fromCloudEventV1(CloudEventV1 cloudEvent) {
public T fromCloudEvent(CloudEvent cloudEvent) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
import java.util.List;
import java.util.Properties;

import io.openmessaging.api.AsyncMessageListener;
import io.openmessaging.api.Message;

import io.cloudevents.CloudEvent;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.consumer.MeshMQPushConsumer;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.consumer.Consumer;
import org.apache.eventmesh.api.factory.ConnectorPluginFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,7 +32,7 @@ public class MQConsumerWrapper extends MQWrapper {

public Logger logger = LoggerFactory.getLogger(this.getClass());

protected MeshMQPushConsumer meshMQPushConsumer;
protected Consumer meshMQPushConsumer;

public MQConsumerWrapper(String connectorPluginType) {
this.meshMQPushConsumer = ConnectorPluginFactory.getMeshMQPushConsumer(connectorPluginType);
Expand All @@ -43,7 +42,7 @@ public MQConsumerWrapper(String connectorPluginType) {
}
}

public void subscribe(String topic, AsyncMessageListener listener) throws Exception {
public void subscribe(String topic, EventListener listener) throws Exception {
meshMQPushConsumer.subscribe(topic, listener);
}

Expand Down Expand Up @@ -72,7 +71,7 @@ public synchronized void shutdown() throws Exception {
// meshMQPushConsumer.registerMessageListener(messageListenerConcurrently);
// }

public void updateOffset(List<Message> msgs, AbstractContext eventMeshConsumeConcurrentlyContext) {
meshMQPushConsumer.updateOffset(msgs, eventMeshConsumeConcurrentlyContext);
public void updateOffset(List<CloudEvent> events, AbstractContext eventMeshConsumeConcurrentlyContext) {
meshMQPushConsumer.updateOffset(events, eventMeshConsumeConcurrentlyContext);
}
}
Loading

0 comments on commit d3a36b5

Please sign in to comment.