Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

Support protobuf with flink #255

Merged
28 changes: 28 additions & 0 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: install protoc
run: |
sudo -E apt update && sudo -E apt install -y wget
cd /tmp
mkdir protoc
wget -O protoc-3.15.5-linux-x86_64.zip https://github.com/protocolbuffers/protobuf/releases/download/v3.15.5/protoc-3.15.5-linux-x86_64.zip
unzip protoc-3.15.5-linux-x86_64.zip -d protoc
chmod 755 -R protoc
BASE=/usr/local
sudo rm -rf $BASE/include/google/protobuf/
sudo cp protoc/bin/protoc $BASE/bin
sudo cp -R protoc/include/* $BASE/include
- name: Run check and test
run: |
./tools/retry.sh mvn -B -ntp clean install -Pscala-2.11
Expand All @@ -47,9 +59,25 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: install protoc
run: |
sudo -E apt update && sudo -E apt install -y wget
cd /tmp
mkdir protoc
wget -O protoc-3.15.5-linux-x86_64.zip https://github.com/protocolbuffers/protobuf/releases/download/v3.15.5/protoc-3.15.5-linux-x86_64.zip
unzip protoc-3.15.5-linux-x86_64.zip -d protoc
chmod 755 -R protoc
BASE=/usr/local
sudo rm -rf $BASE/include/google/protobuf/
sudo cp protoc/bin/protoc $BASE/bin
sudo cp -R protoc/include/* $BASE/include
- name: Run check and test
run: |
./tools/retry.sh mvn -B -ntp clean install -Pscala-2.11

- name: Run check and test
run: |
mvn -pl flink-protobuf test-compile jar:test-jar install
./tools/retry.sh mvn -B -ntp clean install -Pscala-2.12
unit-tests-local:
runs-on: ubuntu-latest
Expand Down
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -594,3 +594,35 @@ For `FlinkPulsarSource` and `FlinkPulsarSink`, you can use one of the following

For details about authentication configuration, see [Pulsar Security](https://pulsar.apache.org/docs/en/security-overview/).

## ProtoBuf supports [experimental features].

This feature is based on [Flink: New Format of protobuf](https://github.com/apache/flink/pull/14376) and is currently pending merge.
Example of using protobuf in sql:
```
create table pulsar (
a INT,
b BIGINT,
c BOOLEAN,
d FLOAT,
e DOUBLE,
f VARCHAR(32),
g BYTES,
h VARCHAR(32),
f_abc_7d INT,
`eventTime` TIMESTAMP(3) METADATA,
compute as a + 1,
watermark for eventTime as eventTime
) with (
'connector' = 'pulsar',
'topic' = 'test-protobuf',
'service-url' = 'pulsar://localhost:6650',
'admin-url' = 'http://localhost:8080',
'scan.startup.mode' = 'earliest',
'format' = 'protobuf',
'protobuf.message-class-name' = 'org.apache.flink.formats.protobuf.testproto.SimpleTest'
)

INSERT INTO pulsar VALUES (1,2,false,0.1,0.01,'haha', ENCODE('1', 'utf-8'), 'IMAGES',1, TIMESTAMP '2020-03-08 13:12:11.123');
```
Requirement: `SimpleTest` class must implement `GeneratedMessageV3`.
Since the Flink Format: ProtoBuf component has not been merged, it is temporarily placed in this repository as a source code for packaging and dependencies.
35 changes: 34 additions & 1 deletion doc/README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -543,4 +543,37 @@ Pulsar Flink 连接器也支持 Key-Shared 订阅模式。可以通过配置参
conf.setAuthParams(params);
```

有关认证的详细信息,参见 [Pulsar Security](https://pulsar.apache.org/docs/en/security-overview/)。
有关认证的详细信息,参见 [Pulsar Security](https://pulsar.apache.org/docs/en/security-overview/)。

## ProtoBuf 支持【试验特性】

该功能基于[Flink: New Format of protobuf](https://github.com/apache/flink/pull/14376),目前正处于等待合并中。
该功能在SQL模式使用如下:
```
create table pulsar (
a INT,
b BIGINT,
c BOOLEAN,
d FLOAT,
e DOUBLE,
f VARCHAR(32),
g BYTES,
h VARCHAR(32),
f_abc_7d INT,
`eventTime` TIMESTAMP(3) METADATA,
compute as a + 1,
watermark for eventTime as eventTime
) with (
'connector' = 'pulsar',
'topic' = 'test-protobuf',
'service-url' = 'pulsar://localhost:6650',
'admin-url' = 'http://localhost:8080',
'scan.startup.mode' = 'earliest',
'format' = 'protobuf',
'protobuf.message-class-name' = 'org.apache.flink.formats.protobuf.testproto.SimpleTest'
)

INSERT INTO pulsar VALUES (1,2,false,0.1,0.01,'haha', ENCODE('1', 'utf-8'), 'IMAGES',1, TIMESTAMP '2020-03-08 13:12:11.123');
```
要求:`SimpleTest`类必须实现`GeneratedMessageV3`。
由于该Flink Format: ProtoBuf组件未合并,暂放于本仓库一份源代码用于打包和依赖。
118 changes: 118 additions & 0 deletions flink-protobuf/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-flink-parent</artifactId>
<version>2.7.6-SNAPSHOT</version>
<!-- <relativePath>..</relativePath>-->
</parent>

<artifactId>flink-protobuf</artifactId>
<name>Flink : Formats : Protobuf</name>

<packaging>jar</packaging>

<properties>
<janino.version>3.0.11</janino.version>
<protoc.version>3.11.1</protoc.version>
<flink.version>1.12.1</flink.version>
<checkstyle.skip>true</checkstyle.skip>
<license.skip>true</license.skip>
<spotbugs.skip>true</spotbugs.skip>
<shade.skip>true</shade.skip>
</properties>

<dependencies>
<!-- core dependencies -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protoc.version}</version>
</dependency>

<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<!-- this should be the same version of flink-table module -->
<version>${janino.version}</version>
<scope>provided</scope>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>${protoc.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<protocVersion>${protoc.version}</protocVersion>
<inputDirectories>
<include>src/test/proto</include>
</inputDirectories>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.protobuf;

public class PbCodegenAppender {
private StringBuilder sb;

public PbCodegenAppender() {
sb = new StringBuilder();
}

public void appendLine(String code) {
sb.append(code + ";\n");
}

public void appendSegment(String code) {
sb.append(code + "\n");
}

public String code() {
return sb.toString();
}

public static String printWithLineNumber(String code) {
StringBuilder sb = new StringBuilder();
String[] lines = code.split("\n");
for (int i = 0; i < lines.length; i++) {
sb.append("Line " + (i + 1) + ": " + lines[i] + "\n");
}
return sb.toString();
}

public String printWithLineNumber() {
StringBuilder newSb = new StringBuilder();
String[] lines = sb.toString().split("\n");
for (int i = 0; i < lines.length; i++) {
newSb.append("Line " + (i + 1) + ": " + lines[i] + "\n");
}
return newSb.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.protobuf;

public class PbCodegenException extends Exception {
public PbCodegenException() {}

public PbCodegenException(String message) {
super(message);
}

public PbCodegenException(String message, Throwable cause) {
super(message, cause);
}

public PbCodegenException(Throwable cause) {
super(cause);
}

public PbCodegenException(
String message,
Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Loading