Skip to content

Commit

Permalink
Merge pull request apache#87 from yuanxiaodong/window
Browse files Browse the repository at this point in the history
Window
  • Loading branch information
xstorm1 authored Oct 25, 2021
2 parents ed95a8f + 900e3cb commit 7250cbc
Show file tree
Hide file tree
Showing 252 changed files with 11,867 additions and 2,993 deletions.
106 changes: 106 additions & 0 deletions README-chinese.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# RocketMQ Streams
## Features

* 轻量级部署:可以单独部署,也支持集群部署
* 多种类型的数据输入以及输出,source 支持 rocketmq , sink 支持db, rocketmq 等

## DataStream Example

```java
import org.apache.rocketmq.streams.client.transform.DataStream;

DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");

source
.fromFile("~/admin/data/text.txt",false)
.map(message->message)
.toPrint(1)
.start();
```

## Maven Repository

```xml

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-clients</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
```

# Core API

rocketmq-stream 实现了一系列高级的API,可以让用户很方便的编写流计算的程序,实现自己的业务需求;

## StreamBuilder

StreamBuilder 用于构建流任务的源; 内部包含```dataStream()``````tableStream()```俩个方法,分别返回DataStreamSource和TableStreamSource俩个源;

+ [dataStream(nameSpaceName,pipelineName)]() 返回DataStreamSource实例,用于分段编程实现流计算任务;
+ [tableStream(nameSpaceName,pipelineName)]()返回TableStreamSource实例, 用于脚本编程实现流计算任务;

## DataStream API

### Source

DataStreamSource 是分段式编程的源头类,用于对接各种数据源, 从各大消息队列中获取数据;

+ ```fromFile``` 从文件中读取数据, 该方法包含俩个参数
+ ```filePath``` 文件路径,必填参数
+ ```isJsonData``` 是否json数据, 非必填参数, 默认为```true```


+ ```fromRocketmq``` 从rocketmq中获取数据,包含四个参数
+ ```topic``` rocketmq消息队列的topic名称,必填参数
+ ```groupName``` 消费者组的名称,必填参数
+ ```isJson``` 是否json格式,非必填参数
+ ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数

+ ```from``` 自定义的数据源, 通过实现ISource接口实现自己的数据源

### transform

transform 允许在流计算过程中对输入源的数据进行修改,进行下一步的操作;DataStream API中包括```DataStream```,```JoinStream```, ```SplitStream```,```WindowStream```等多个transform类;

#### DataStream

DataStream实现了一系列常见的流计算算子

+ ```map``` 通过将源的每个记录传递给函数func来返回一个新的DataStream
+ ```flatmap``` 与map类似,一个输入项对应0个或者多个输出项
+ ```filter``` 只选择func返回true的源DStream的记录来返回一个新的DStream
+ ```forEach``` 对每个记录执行一次函数func, 返回一个新的DataStream
+ ```selectFields``` 对每个记录返回对应的字段值,返回一个新的DataStream
+ ```operate``` 对每个记录执行一次自定义的函数,返回一个新的DataStream
+ ```script``` 针对每个记录的字段执行一段脚本,返回新的字段,生成一个新的DataStream
+ ```toPrint``` 将结果在控制台打印,生成新的DataStreamAction实例
+ ```toFile``` 将结果保存为文件,生成一个新的DataStreamAction实例
+ ```toDB``` 将结果保存到数据库
+ ```toRocketmq``` 将结果输出到rocketmq
+ ```to``` 将结果经过自定义的ISink接口输出到指定的存储
+ ```window``` 在窗口内进行相关的统计分析,一般会与```groupBy```连用, ```window()```用来定义窗口的大小, ```groupBy()```用来定义统计分析的主key,可以指定多个
+ ```count``` 在窗口内计数
+ ```min``` 获取窗口内统计值的最小值
+ ```max``` 获取窗口内统计值得最大值
+ ```avg``` 获取窗口内统计值的平均值
+ ```sum``` 获取窗口内统计值的加和值
+ ```reduce``` 在窗口内进行自定义的汇总运算
+ ```join``` 根据条件将将俩个流进行关联, 合并为一个大流进行相关的运算
+ ```union``` 将俩个流进行合并
+ ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
+ ```with``` with算子用来指定计算过程中的相关策略,包括checkpoint的存储策略,state的存储策略等

# Strategy

策略机制主要用来控制计算引擎运行过程中的底层逻辑,如checkpoint,state的存储方式等,后续还会增加对窗口、双流join等的控制;所有的控制策略通过```with```算子传入,可以同时传入多个策略类型;

```java
//指定checkpoint的存储策略
source
.fromRocketmq("TSG_META_INFO","")
.map(message->message+"--")
.toPrint(1)
.with(CheckpointStrategy.db("jdbc:mysql://XXXXX:3306/XXXXX","","",0L))
.start();
```
2 changes: 1 addition & 1 deletion build_without_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# the License. You may obtain a copy of the License ato
#
# http://www.apache.org/licenses/LICENSE-2.0
#
Expand Down
19 changes: 13 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@
<module>rocketmq-streams-channel-http</module>
<module>rocketmq-streams-state</module>
<module>rocketmq-streams-examples</module>

<module>rocketmq-streams-checkpoint</module>
<module>rocketmq-streams-connectors</module>
</modules>

<properties>
Expand All @@ -69,7 +70,7 @@
<commons-logging.version>1.1</commons-logging.version>
<spring.version>3.2.13.RELEASE</spring.version>
<auto-service.version>1.0-rc5</auto-service.version>
<mysql-connector.version>8.0.26</mysql-connector.version>
<mysql-connector.version>5.1.40</mysql-connector.version>
<fastjson.version>1.2.78</fastjson.version>
<quartz.version>2.2.1</quartz.version>
<httpclient.version>4.5.2</httpclient.version>
Expand All @@ -88,6 +89,8 @@
<scala-library.version>2.12.4</scala-library.version>
<logback-core.version>1.2.3</logback-core.version>
<minio.version>3.0.10</minio.version>
<rocksdbjni.version>6.6.4</rocksdbjni.version>

</properties>


Expand All @@ -112,13 +115,12 @@
<exclude>.asf.yaml</exclude>
<exclude>README.md</exclude>
<exclude>README-Chinese.md</exclude>
<exclude>quick_start.md</exclude>
<exclude>QUICKSTART.md</exclude>
<exclude>.github/**</exclude>
<exclude>*/target/**</exclude>
<exclude>*/*.iml</exclude>
<exclude>**/*.txt</exclude>
<exclude>**/*.cs</exclude>
<exclude>**/*.sql</exclude>
</excludes>
</configuration>
</plugin>
Expand Down Expand Up @@ -178,11 +180,17 @@
<!-- ================================================= -->
<!-- rocketmq streams library -->
<!-- ================================================= -->

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-dbinit</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-clients</artifactId>
Expand Down Expand Up @@ -285,11 +293,10 @@
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-dbinit</artifactId>
<artifactId>rocketmq-streams-connectors</artifactId>
<version>${project.version}</version>
</dependency>


<!-- ================================================= -->
<!-- rocketmq library -->
<!-- ================================================= -->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.db;

import java.io.Serializable;

/**
* @description
*/
public class CycleSplit extends DynamicMultipleDBSplit implements Serializable {

private static final long serialVersionUID = 4309494143340650195L;
String cyclePeriod;

public CycleSplit(){

}

public CycleSplit(String version){
this.cyclePeriod = version;
}

@Override
public String getQueueId() {
return String.join("_", logicTableName, suffix, cyclePeriod);
}

public String getCyclePeriod() {
return cyclePeriod;
}

public void setCyclePeriod(String cyclePeriod) {
this.cyclePeriod = cyclePeriod;
}

@Override
public String toString() {
return "CycleSplit{" +
"cyclePeriod='" + cyclePeriod + '\'' +
", suffix='" + suffix + '\'' +
", logicTableName='" + logicTableName + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.db;

import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;

/**
* @description
*/
public class DynamicMultipleDBSplit extends BasedConfigurable implements ISplit<DynamicMultipleDBSplit, String> {

String suffix;
String logicTableName;

public DynamicMultipleDBSplit() {
}

public DynamicMultipleDBSplit(String suffix, String logicTableName) {
this.suffix = suffix;
this.logicTableName = logicTableName;
}

public String getSuffix() {
return suffix;
}

public void setSuffix(String suffix) {
this.suffix = suffix;
}

public String getLogicTableName() {
return logicTableName;
}

public void setLogicTableName(String logicTableName) {
this.logicTableName = logicTableName;
}

@Override
public String getQueueId() {
return logicTableName + "_" + suffix;
}

@Override
public String getPlusQueueId() {
throw new RuntimeException("unsupported getPlusQueueId!");
}

@Override
public String getQueue() {
return logicTableName + "_" + suffix;
}

@Override
public int compareTo(DynamicMultipleDBSplit o) {
return getQueue().compareTo(o.getQueue());
}

@Override
public String toString() {
return "DynamicMultipleDBSplit{" +
"logicTableName='" + logicTableName + '\'' +
", suffix='" + suffix + '\'' +
'}';
}
}
Loading

0 comments on commit 7250cbc

Please sign in to comment.