Skip to content

Commit

Permalink
[Feature #562] support cloudevents api in eventmesh-connector-api mod…
Browse files Browse the repository at this point in the history
…ule (#578)

* [Feature #564] Support CloudEvents protocols for pub/sub in EventMesh-feature design

* support cloudevents api in eventmesh-connector-api module

* fix checkStyle

* fix checkStyle

* fix checkStyle

* 1.support LifeCycle.java
2.update Consumer and Producer

* fix remove the extra blank line
  • Loading branch information
xwm1992 authored Nov 10, 2021
1 parent 1feaaa7 commit deee538
Show file tree
Hide file tree
Showing 12 changed files with 373 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
dependencies {
implementation project(":eventmesh-spi")
implementation project(":eventmesh-common")
api 'io.cloudevents:cloudevents-core'
api 'io.openmessaging:openmessaging-api'
api 'io.dropwizard.metrics:metrics-core'
api "io.dropwizard.metrics:metrics-healthchecks"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.api;


public abstract class AsyncConsumeContext {

public abstract void commit(EventMeshAction action);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.api;


import io.cloudevents.CloudEvent;

/**
* Event listener, registered for consume messages by consumer.
*
* <p>
* <strong>
* Thread safe requirements: this interface will be invoked by multi threads,
* so users should keep thread safe during the consume process.
* </strong>
* </p>
*/
public interface EventListener {

void consume(final CloudEvent cloudEvent, final AsyncConsumeContext context);

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.api;

public enum EventMeshAction {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.api;

import io.openmessaging.api.Action;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.api;

import org.apache.eventmesh.api.consumer.Consumer;
import org.apache.eventmesh.api.producer.Producer;

/**
* The {@code LifeCycle} defines a lifecycle interface for a OMS related service endpoint,
* like {@link Producer}, {@link Consumer}, and so on.
*/
public interface LifeCycle {

boolean isStarted();

boolean isClosed();

void start();

void shutdown();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.api;

import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.api.producer.Producer;

import io.cloudevents.CloudEvent;

/**
* Call back interface used in {@link Producer#sendAsync(CloudEvent, SendCallback)}.
*/
public interface SendCallback {

void onSuccess(final SendResult sendResult);

void onException(final OnExceptionContext context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.api;

public class SendResult {
private String messageId;

private String topic;

public String getMessageId() {
return messageId;
}

public void setMessageId(String messageId) {
this.messageId = messageId;
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

@Override
public String toString() {
return "SendResult[topic=" + topic + ", messageId=" + messageId + ']';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.api.consumer;

import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.LifeCycle;
import org.apache.eventmesh.spi.EventMeshExtensionType;
import org.apache.eventmesh.spi.EventMeshSPI;

import java.util.List;
import java.util.Properties;

import io.cloudevents.CloudEvent;



/**
* Consumer Interface.
*/
@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR)
public interface Consumer extends LifeCycle {

void init(Properties keyValue) throws Exception;

void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context);

void subscribe(String topic, final EventListener listener) throws Exception;

void unsubscribe(String topic);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.api.exception;

public class ConnectorRuntimeException extends RuntimeException {

public ConnectorRuntimeException() {

}

public ConnectorRuntimeException(String message) {
super(message);
}

public ConnectorRuntimeException(Throwable throwable) {
super(throwable);
}

public ConnectorRuntimeException(String message, Throwable throwable) {
super(message, throwable);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.api.exception;

public class OnExceptionContext {

private String messageId;

private String topic;

/**
* Detailed exception stack information.
*/
private ConnectorRuntimeException exception;


public String getMessageId() {
return messageId;
}

public void setMessageId(String messageId) {
this.messageId = messageId;
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}


public ConnectorRuntimeException getException() {
return exception;
}

public void setException(ConnectorRuntimeException exception) {
this.exception = exception;
}
}
Loading

0 comments on commit deee538

Please sign in to comment.