Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #563] SDK SUPPORT CLOUD EVENT #575

Merged
merged 9 commits into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,32 @@

package org.apache.eventmesh.common.protocol.tcp;

import java.util.HashMap;
import java.util.Map;

public class Header {

private Command cmd;
private int code;
private String msg;
private String dsec;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is dsec mean? desc?

private String seq;
private Map<String,Object> properties;

public Header() {
}

public Header(Command cmd, int code, String msg, String seq) {
public Header(Command cmd, int code, String dsec, String seq) {
this.cmd = cmd;
this.code = code;
this.msg = msg;
this.dsec = dsec;
this.seq = seq;
}

public Header(int code, String dsec, String seq, Map<String, Object> properties) {
this.code = code;
this.dsec = dsec;
this.seq = seq;
this.properties = properties;
}

public Command getCommand() {
Expand All @@ -50,12 +61,12 @@ public void setCode(int code) {
this.code = code;
}

public String getMsg() {
return msg;
public String getDsec() {
return dsec;
}

public void setMsg(String msg) {
this.msg = msg;
public void setDsec(String dsec) {
this.dsec = dsec;
}

public String getSeq() {
Expand All @@ -66,13 +77,38 @@ public void setSeq(String seq) {
this.seq = seq;
}

public Map<String, Object> getProperties() {
return properties;
}

public void setProperties(Map<String, Object> properties) {
this.properties = properties;
}

public void putProperty(final String name, final Object value) {
if (null == this.properties) {
this.properties = new HashMap<>();
}

this.properties.put(name, value);
}

public Object getProperty(final String name) {
if (null == this.properties) {
this.properties = new HashMap<>();
}

return this.properties.get(name);
}

@Override
public String toString() {
return "Header{" +
"cmd=" + cmd +
", code=" + code +
", msg='" + msg + '\'' +
", seq='" + seq + '\'' +
'}';
"cmd=" + cmd +
", code=" + code +
", dsec='" + dsec + '\'' +
", seq='" + seq + '\'' +
", properties=" + properties +
'}';
}
}
2 changes: 1 addition & 1 deletion eventmesh-examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dependencies {
implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'io.netty:netty-all'

implementation "io.cloudevents:cloudevents-core"
testImplementation project(":eventmesh-sdk-java")
testImplementation project(":eventmesh-common")
testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncSubscribe implements ReceiveMsgHook {
public class AsyncSubscribe implements ReceiveMsgHook<EventMeshMessage> {

public static Logger logger = LoggerFactory.getLogger(AsyncSubscribe.class);

Expand Down Expand Up @@ -68,7 +68,12 @@ public static void main(String[] agrs) throws Exception {

@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
EventMeshMessage eventMeshMessage = (EventMeshMessage) msg.getBody();
EventMeshMessage eventMeshMessage = convert(msg);
logger.info("receive async msg====================={}", eventMeshMessage);
}

@Override
public EventMeshMessage convert(Package pkg) {
return (EventMeshMessage) pkg.getBody();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncSubscribeBroadcast implements ReceiveMsgHook {
public class AsyncSubscribeBroadcast implements ReceiveMsgHook<EventMeshMessage> {

public static Logger logger = LoggerFactory.getLogger(AsyncSubscribeBroadcast.class);

Expand Down Expand Up @@ -68,7 +68,12 @@ public static void main(String[] agrs) throws Exception {

@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
EventMeshMessage eventMeshMessage = (EventMeshMessage) msg.getBody();
EventMeshMessage eventMeshMessage = convert(msg);
logger.info("receive broadcast msg==============={}", eventMeshMessage);
}

@Override
public EventMeshMessage convert(Package pkg) {
return (EventMeshMessage) pkg.getBody();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SyncResponse implements ReceiveMsgHook {
public class SyncResponse implements ReceiveMsgHook<EventMeshMessage> {

public static Logger logger = LoggerFactory.getLogger(SyncResponse.class);

Expand Down Expand Up @@ -66,4 +67,9 @@ public void handle(Package msg, ChannelHandlerContext ctx) {
Package pkg = EventMeshTestUtils.rrResponse(msg);
ctx.writeAndFlush(pkg);
}

@Override
public EventMeshMessage convert(Package pkg) {
return null;
}
}
2 changes: 2 additions & 0 deletions eventmesh-sdk-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ dependencies {
implementation "io.netty:netty-all"
implementation "org.apache.httpcomponents:httpclient"

implementation "io.cloudevents:cloudevents-core"

testImplementation project(":eventmesh-common")
testImplementation project(":eventmesh-common")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.client.tcp;

import io.cloudevents.CloudEvent;
import org.apache.eventmesh.client.tcp.common.AsyncRRCallback;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.common.protocol.SubscriptionType;
Expand All @@ -31,6 +32,10 @@ public interface EventMeshClient {

Package publish(Package msg, long timeout) throws Exception;

Package publish(CloudEvent cloudEvent, long timeout) throws Exception;

void broadcast(CloudEvent cloudEvent, long timeout) throws Exception;

void broadcast(Package msg, long timeout) throws Exception;

void init() throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.eventmesh.client.tcp;


import io.cloudevents.CloudEvent;
import org.apache.eventmesh.client.tcp.common.AsyncRRCallback;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.common.protocol.tcp.Package;
Expand All @@ -39,6 +40,10 @@ public interface SimplePubClient {

Package publish(Package msg, long timeout) throws Exception;

Package publish(CloudEvent cloudEvent, long timeout) throws Exception;

void broadcast(CloudEvent cloudEvent, long timeout) throws Exception;

void broadcast(Package msg, long timeout) throws Exception;

void registerBusiHandler(ReceiveMsgHook handler) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,6 @@ public class EventMeshCommon {
public static String PREFIX_SESSION_TPS_STAT_EVENTSEND = "event_send_tps_";

public static String PREFIX_SESSION_TPS_STAT_EVENTREV = "event_rev_tps_";

public static String CLOUD_EVENTS_PROTOCOL_NAME = "cloudevents";
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import io.cloudevents.CloudEvent;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Subscription;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
Expand Down Expand Up @@ -76,6 +77,18 @@ public static Package asyncMessageAck(Package in) {
return msg;
}

public static Package asyncCloudEvent(CloudEvent cloudEvent) {
Package msg = new Package();
msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_SERVER, 0,
null, generateRandomString(seqLength)));
msg.getHeader().putProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL,
EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME);
msg.getHeader().putProperty(PropertyConst.PROPERTY_CLOUD_EVENT_VERSION,
cloudEvent.getSpecVersion().toString());
msg.setBody(cloudEvent);
return msg;
}

public static Package broadcastMessageAck(Package in) {
Package msg = new Package();
msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, 0, null, in.getHeader().getSeq()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.eventmesh.client.tcp.common;

/**
* properties key name
*/
public class PropertyConst {

public static String PROPERTY_MESSAGE_PROTOCOL = "message_protocol";

public static String PROPERTY_CLOUD_EVENT_VERSION = "cloud_event_version";
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
import io.netty.channel.ChannelHandlerContext;
import org.apache.eventmesh.common.protocol.tcp.Package;

public interface ReceiveMsgHook {
/**
* ReceiveMsgHook.
*
* @param <T> receive message type.
*/
public interface ReceiveMsgHook<T> {
void handle(Package msg, ChannelHandlerContext ctx);

T convert(Package pkg);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this method just execute by itself, it's not suggested to add this in interface. This will make interface unclear.

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.eventmesh.client.tcp.impl;


import io.cloudevents.CloudEvent;
import org.apache.eventmesh.client.tcp.EventMeshClient;
import org.apache.eventmesh.client.tcp.SimplePubClient;
import org.apache.eventmesh.client.tcp.SimpleSubClient;
Expand Down Expand Up @@ -73,10 +74,20 @@ public Package publish(Package msg, long timeout) throws Exception {
return this.pubClient.publish(msg, timeout);
}

@Override
public Package publish(CloudEvent cloudEvent, long timeout) throws Exception {
return this.pubClient.publish(cloudEvent, timeout);
}

public void broadcast(Package msg, long timeout) throws Exception {
this.pubClient.broadcast(msg, timeout);
}

@Override
public void broadcast(CloudEvent cloudEvent, long timeout) throws Exception {
this.pubClient.broadcast(cloudEvent, timeout);
}

public void init() throws Exception {
this.subClient.init();
this.pubClient.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import io.cloudevents.CloudEvent;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
Expand All @@ -29,6 +30,7 @@
import org.apache.eventmesh.client.tcp.common.AsyncRRCallback;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.client.tcp.common.MessageUtils;
import org.apache.eventmesh.client.tcp.common.PropertyConst;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.common.RequestContext;
import org.apache.eventmesh.client.tcp.common.TcpClient;
Expand Down Expand Up @@ -145,6 +147,25 @@ public Package publish(Package msg, long timeout) throws Exception {
return io(msg, timeout);
}


@Override
public Package publish(CloudEvent cloudEvent, long timeout) throws Exception {
Package msg = MessageUtils.asyncCloudEvent(cloudEvent);
logger.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}",
clientNo, msg.getHeader().getCommand(),
msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg);
return io(MessageUtils.asyncCloudEvent(cloudEvent), timeout);
}

@Override
public void broadcast(CloudEvent cloudEvent, long timeout) throws Exception {
Package msg = MessageUtils.asyncCloudEvent(cloudEvent);
logger.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}",
clientNo, msg.getHeader().getCommand(),
msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg);
super.send(msg);
}

/**
* Send broadcast message
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncSubscribe implements ReceiveMsgHook {
public class AsyncSubscribe implements ReceiveMsgHook<EventMeshMessage> {

public static Logger logger = LoggerFactory.getLogger(AsyncSubscribe.class);

Expand Down Expand Up @@ -63,7 +63,12 @@ public static void main(String[] agrs) throws Exception {

@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
EventMeshMessage eventMeshMessage = (EventMeshMessage) msg.getBody();
EventMeshMessage eventMeshMessage = convert(msg);
logger.info("receive async msg====================={}", eventMeshMessage);
}

@Override
public EventMeshMessage convert(Package pkg) {
return (EventMeshMessage) pkg.getBody();
}
}
Loading