Skip to content

Commit

Permalink
Rebase the grpc branch to master branch (#771)
Browse files Browse the repository at this point in the history
* [Issue #417] Grpc Transport Protocol support (#710)

Grpc Transport Protocol support

* [Issue #417] Create getting started instructions for Grpc transport procotol

* [Issue #417] update Grpc Message Model name to SimpleMessage

* [Issue #417] more update Grpc Message Model name to SimpleMessage

* [Issue #718] Fix readme file and protobuf file based on review comments

* [Issue #745] fix the ack bugs and cloudevent message resolver

* [Issue #744] update SDK API message model

* [Issue #744] fix the gRPC Consumer SubscribeStream Message handler

* [Issue #744] Grpc Request-Reply API support

* [Issue #744] Bug fix for Grpc Request-Reply API support

* [Issue #744] minor fix for Grpc request-Reply API

* [Issue #744] fix infinte message loop in Grpc CloudEvent request-Reply API

* [Issue #744] Fix Grpc subscribe-unsubscribe bug

* [Issue #744] Fix Data models in Grpc Request-Reply API

* [Issue #744] Code optimization for Grpc Request-Reply API

* [Issue #417] support Grpc broadcast async publish

* [Issue #718] add synchronized calls for grpc streamObserver

* supply apache header

* add checkstyle ignore for grpc

* fix checkstyle error

* fix javax.annotation.generated compile error

* fix javax.annotation.generated compile error

* supply dependencies licenses

* update known-dependencies.txt

* update known-dependencies.txt

Co-authored-by: jinrongluo <[email protected]>
  • Loading branch information
xwm1992 and jinrongluo authored Feb 21, 2022
1 parent 6e96d95 commit 10a1004
Show file tree
Hide file tree
Showing 121 changed files with 26,539 additions and 25 deletions.
9 changes: 9 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ allprojects {
configFile = new File("${rootDir}/style/checkStyle.xml")
}

checkstyleMain.exclude '**/org/apache/eventmesh/client/grpc/protos**'

dependencies {
testImplementation "junit:junit"
}
Expand Down Expand Up @@ -464,6 +466,13 @@ subprojects {
dependency "io.cloudevents:cloudevents-core:2.2.0"
dependency "io.cloudevents:cloudevents-json-jackson:2.2.0"

dependency "io.grpc:grpc-protobuf:1.15.0"
dependency "io.grpc:grpc-stub:1.15.0"
dependency "io.grpc:grpc-netty:1.15.0"
dependency "io.grpc:grpc-netty-shaded:1.15.0"

dependency "javax.annotation:javax.annotation-api:1.3.2"

dependency "com.github.seancfoley:ipaddress:5.3.3"
}
}
Expand Down
161 changes: 161 additions & 0 deletions docs/cn/instructions/eventmesh-runtime-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,164 @@ public class LiteMessage {
| 场景 | Server向Client发送消息请求码 | Client回复Server消息响应码 | 说明 |
| ------------------ | ---------------------------- | -------------------------- | ---------------------- |
| 客户端接收异步事件 | HTTP_PUSH_CLIENT_ASYNC(105) | retCode | retCode值为0时代表成功 |


## gRPC 协议文档

#### 1. protobuf

`eventmesh-protocol-gprc` 模块有 Eventmesh gRPC 客户端的 protobuf 文件. the protobuf 文件路径是 `/src/main/proto/eventmesh-client.proto`.

用gradle build 生成 gRPC 代码在 `/build/generated/source/proto/main`. 生成代码用于 `eventmesh-sdk-java` 模块.

#### 2. gRPC 数据模型

- 消息

以下消息数据模型用于 `publish()`, `requestReply()``broadcast()` APIs.

```
message RequestHeader {
string env = 1;
string region = 2;
string idc = 3;
string ip = 4;
string pid = 5;
string sys = 6;
string username = 7;
string password = 8;
string language = 9;
string protocolType = 10;
string protocolVersion = 11;
string protocolDesc = 12;
}
message SimpleMessage {
RequestHeader header = 1;
string producerGroup = 2;
string topic = 3;
string content = 4;
string ttl = 5;
string uniqueId = 6;
string seqNum = 7;
string tag = 8;
map<string, string> properties = 9;
}
message BatchMessage {
RequestHeader header = 1;
string producerGroup = 2;
string topic = 3;
message MessageItem {
string content = 1;
string ttl = 2;
string uniqueId = 3;
string seqNum = 4;
string tag = 5;
map<string, string> properties = 6;
}
repeated MessageItem messageItem = 4;
}
message Response {
string respCode = 1;
string respMsg = 2;
string respTime = 3;
}
```

- 订阅

以下订阅数据模型用于 `subscribe()``unsubscribe()` APIs.

```
message Subscription {
RequestHeader header = 1;
string consumerGroup = 2;
message SubscriptionItem {
enum SubscriptionMode {
CLUSTERING = 0;
BROADCASTING = 1;
}
enum SubscriptionType {
ASYNC = 0;
SYNC = 1;
}
string topic = 1;
SubscriptionMode mode = 2;
SubscriptionType type = 3;
}
repeated SubscriptionItem subscriptionItems = 3;
string url = 4;
}
```

- 心跳

以下心跳数据模型用于 `heartbeat()` API.

```
message Heartbeat {
enum ClientType {
PUB = 0;
SUB = 1;
}
RequestHeader header = 1;
ClientType clientType = 2;
string producerGroup = 3;
string consumerGroup = 4;
message HeartbeatItem {
string topic = 1;
string url = 2;
}
repeated HeartbeatItem heartbeatItems = 5;
}
```

#### 3. gRPC 服务接口

- 事件生产端服务 APIs

```
service PublisherService {
# 异步事件生产
rpc publish(SimpleMessage) returns (Response);
# 同步事件生产
rpc requestReply(SimpleMessage) returns (Response);
# 批量事件生产
rpc batchPublish(BatchMessage) returns (Response);
}
```

- 事件消费端服务 APIs

```
service ConsumerService {
# 所消费事件通过 HTTP Webhook推送事件
rpc subscribe(Subscription) returns (Response);
# 所消费事件通过 TCP stream推送事件
rpc subscribeStream(Subscription) returns (stream SimpleMessage);
rpc unsubscribe(Subscription) returns (Response);
}
```

- 客户端心跳服务 API

```
service HeartbeatService {
rpc heartbeat(Heartbeat) returns (Response);
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ sudo vi eventmesh.properties
|----------------------------|-------|----------------------------|
| eventMesh.server.http.port | 10105 | EventMesh http server port |
| eventMesh.server.tcp.port | 10000 | EventMesh tcp server port |

| eventMesh.server.grpc.port | 10205 | EventMesh grpc server port |


### 配置 rocketmq-client.properties
Expand Down
1 change: 1 addition & 0 deletions docs/cn/instructions/eventmesh-runtime-quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ eventMesh.connector.plugin.type=rocketmq
| ---------------------- | ------ | ----------------------- |
| eventMesh.server.http.port | 10105 | EventMesh http 服务端口 |
| eventMesh.server.tcp.port | 10000 | EventMesh tcp 服务端口 |
| eventMesh.server.grpc.port | 10205 | EventMesh grpc 服务端口 |

**rocketmq-client.properties**

Expand Down
53 changes: 51 additions & 2 deletions docs/cn/instructions/eventmesh-sdk-java-quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
>
> EventMesh-sdk-java支持异步消息和广播消息。异步消息表示生产者只发送消息,不关心回复消息。广播消息表示生产者发送一次消息,所有订阅广播主题的消费者都将收到消息
>
> EventMesh-sdk-java支持HTTP和TCP协议
> EventMesh-sdk-java支持HTTP,TCP 和 GRPC 协议
TCP 和 HTTP 示例都在**eventmesh-example**模块下
TCP, HTTP 和 GRPC 示例都在**eventmesh-example**模块下

### 1. TCP DEMO

Expand Down Expand Up @@ -70,5 +70,54 @@ TCP 和 HTTP 示例都在**eventmesh-example**模块下
运行org.apache.eventmesh.http.demo.pub.eventmeshmessage.AsyncPublishInstance的主要方法
```

### 3. GRPC 演示

> eventmesh-sdk-java 实现了 gRPC 协议. 它能异步和同步发送事件到 eventmesh-runtime.
> 它可以通过webhook和事件流方式订阅消费事件, 同时也支持 CNCF CloudEvents 协议.
<h4> 异步事件发送 和 webhook订阅 </h4>

> Async生产者 异步发送事件到 eventmesh-runtime, 不需要等待事件储存到 `event-store`
> 在webhook 消费者, 事件推送到消费者的http endpoint url。这个URL在消费者的 `Subscription` 模型定于. 这方法跟前面的Http eventmsh client类似。
- 在rocketmq 创建主题 TEST-TOPIC-GRPC-ASYNC
- 启动 publisher 发送事件

```
运行 org.apache.eventmesh.grpc.pub.eventmeshmessage.AsyncPublishInstance 的主要方法
```

- 启动 webhook 消费者

```
运行 org.apache.eventmesh.grpc.sub.app.SpringBootDemoApplication 的主要方法
```

<h4> 同步事件发送和事件流订阅 </h4>

> 同步生产者 发送事件到 eventmesh-runtime, 同时等待事件储存到 `event-store`
> 在事件流消费者,事件以流的形式推送到 `ReceiveMsgHook` 客户端。 这方法类似 eventmesh client.
- 在rocketmq 创建主题 TEST-TOPIC-GRPC-RR
- 启动 Request-Reply publisher 发送事件

```
运行 org.apache.eventmesh.grpc.pub.eventmeshmessage.RequestReplyInstance 的主要方法
```

- 启动 stream subscriber

```
运行 org.apache.eventmesh.grpc.sub.EventmeshAsyncSubscribe 的主要方法
```

<h4> 批量事件发布 </h4>

> 批量发布多个事件到 eventmesh-runtime. 这是异步操作
- 在rocketmq 创建主题 TEST-TOPIC-GRPC-ASYNC
- 启动 publisher 来批量发布事件

```
运行 org.apache.eventmesh.grpc.pub.eventmeshmessage.BatchPublishInstance 的主要方法
```
Loading

0 comments on commit 10a1004

Please sign in to comment.