Skip to content

Commit

Permalink
Merge pull request #2 from griddb/0.6-rc
Browse files Browse the repository at this point in the history
Update for 0.6
  • Loading branch information
AnggaSuherman authored Sep 6, 2024
2 parents 96722cc + 143c95c commit a0f0108
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 48 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ The Griddb Kafka Connector is a Kafka Connector for loading data to and from Gri

Building of the library and execution of the sample programs have been checked in the following environment.

OS: CentOS 7.9(x64) / Ubuntu 20.04(x64)
Java: 1.8
Maven: 3.5.0
Kafka: 2.12-3.2.0
GridDB server: V5.0 CE
OS: Ubuntu 24.04(x64)
Java: 17
Maven: 3.9.5
Kafka: 2.13-3.7.1
GridDB server: V5.6 CE, Ubuntu 22.04(x64)

# Build

Expand Down
59 changes: 59 additions & 0 deletions README_ja.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
GridDB Kafkaコネクタ

# 概要

GridDB Kafkaコネクタは、[Apache Kafka](https://kafka.apache.org/)とGridDBデータベースとの間でデータのやり取りを可能にする
ソフトウェアです。

# 動作環境

以下の環境でビルドとサンプルプログラムの実行を確認しています。

OS: Ubuntu 24.04(x64)
Java: 17
Maven: 3.9.5
Kafka: 2.13-3.7.1
GridDB server: V5.6 CE, Ubuntu 22.04(x64)

# ビルド

```console
$ cd GRIDDB_KAFKA_CONNECTOR_FOLDER
$ mvn clean install
```

# 実行

ビルド後、GRIDDB_KAFKA_CONNECTOR_FOLDER/target/griddb-kafka-connector-X.X.X.jarファイルをKAFKA_FOLDER/libsフォルダの下に配置してください。
そして、Kafkaサーバを起動してください。

※ X.X.Xはバージョンを意味します。

configフォルダ上に、GridDB KafkaコネクタのSink connector、Source connectorを動かすためのコンフィグファイルのサンプルがあります。

## サンプルの実行(Sink connector、Source connector)

["Using Apache Kafka with GridDB database"](docs/GridDB-kafka-sink-connect-and-source-connect-guide.md)をご参照ください。

# 機能

以下のデータ型が利用可能です。
- GridDBのSTRING型, BOOL型, BYTE型, SHORT型, INTEGER型, LONG型, FLOAT型, DOUBLE型, TIMESTAMP型, BLOB型

以下のデータ型は利用できません。
- GridDbのGEOMETRY型, 配列型

## コミュニティ
* Issues
質問、不具合報告はissue機能をご利用ください。
* PullRequest
GridDB Contributor License Agreement(CLA_rev1.1.pdf)に同意して頂く必要があります。
PullRequest機能をご利用の場合はGridDB Contributor License Agreementに同意したものとみなします。

# ライセンス

GridDB KafkaコネクタのライセンスはApache License, version 2.0です。

# 商標

Apache Kafka, Kafka are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.
2 changes: 2 additions & 0 deletions config/griddb-sink.properties
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ password=admin
notification.member=
notification.provider.url=

container.type=

topics.regex=csh(.*)
# topics=

Expand Down
1 change: 1 addition & 0 deletions config/griddb-source.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ notification.member=
notification.provider.url=

poll.interval.ms=5000
batch.max.rows=

containers=timestampContainer

Expand Down
60 changes: 31 additions & 29 deletions docs/GridDB-kafka-sink-connect-and-source-connect-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,19 @@ Note: Please refer to an example in GridDB written for 2 modes as below.

Building of the library and execution of the sample programs have been checked in the following environment.

OS: CentOS 7.9(x64)/Ubuntu 20.04
Java: 1.8
Maven: 3.5.0
Kafka: 2.12-3.2.0
GridDB server: V5.0 CE
OS: Ubuntu 24.04(x64)
Java: 17
Maven: 3.9.5
Kafka: 2.13-3.7.1
GridDB server: V5.6 CE, Ubuntu 22.04(x64)

### Install and start Apache Kafka

```console
$ wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.12-3.2.0.tgz
$ tar xzvf kafka_2.12-3.2.0.tgz
$ cd kafka_2.12-3.2.0
$ export PATH=$PATH:/path/to/kafka_2.12-3.2.0/bin
$ wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.7.1.tgz
$ tar xzvf kafka_2.13-3.7.1.tgz
$ cd kafka_2.13-3.7.1
$ export PATH=$PATH:/path/to/kafka_2.13-3.7.1/bin
$ zookeeper-server-start.sh -daemon config/zookeeper.properties # Start zookeeper server
$ kafka-server-start.sh config/server.properties # Start Apache Kafka server
```
Expand Down Expand Up @@ -134,7 +134,7 @@ Note:

2. After using command `$ mvn clean install`, file griddb-kafka-connector-X.X.jar will be created in "target/" folder.

3. Copy the griddb-kafka-connector-X.X.jar file to /path/to/kafka_2.12-3.2.0/libs/
3. Copy the griddb-kafka-connector-X.X.jar file to /path/to/kafka_2.13-3.7.1/libs/

## About GridDB Kafka sink connector

Expand All @@ -156,7 +156,7 @@ Note:
```
* Export PATH for executing script_sink.sh
```console
$ export PATH=$PATH:/path/to/kafka_2.12-3.2.0/bin
$ export PATH=$PATH:/path/to/kafka_2.13-3.7.1/bin
```
* Run script_sink.sh for Apache Kafka

Expand Down Expand Up @@ -184,7 +184,7 @@ Note:
```
2. Open a new terminal for executing this command:
```console
$ cd kafka_2.12-3.2.0
$ cd kafka_2.13-3.7.1
$ ./bin/connect-standalone.sh config/connect-standalone.properties GRIDDB_KAFKA_CONNECTOR_FOLDER/config/griddb-sink.properties
```
3. After finishing the command above, data/topic was pushed into GridDB database.
Expand Down Expand Up @@ -228,7 +228,8 @@ The 1 results had been acquired.
|batch.size|the size of write buffer to GridDB|3000|
|multiput|using multiput or single put in write buffer|true|
|container.name.format|using it to change to topic name from GridDB container|$(topic): The default container name is topic name |

|container.type|The GridDB container type. Choose either `COLLECTION` or `TIME_SERIES`. The TIME_SERIES container will be created when the `container.type` is TIME_SERIES and the first column is TIMESTAMP, otherwise the COLLECTION container will be created.|COLLECTION |

Note:
* In file config/griddb-sink.properties: config values (connector.class, name, topics.regex or topics, transforms) are the properties used by Apache Kafka, not the connector).
* Just configure one property between "topics.regex" and "topics".
Expand Down Expand Up @@ -277,7 +278,7 @@ Note:

2. Open new terminal for executing commands:
```console
$ cd kafka_2.12-3.2.0
$ cd kafka_2.13-3.7.1
$ ./bin/connect-standalone.sh config/connect-standalone.properties GRIDDB_KAFKA_CONNECTOR_FOLDER/config/griddb-source.properties
```
Note:
Expand Down Expand Up @@ -311,21 +312,22 @@ Note:

* GridDB Kafka source connector config parameters in the config file

|Parameter | Description | Default Value |
|---|---|---|
|connector.class|the source connector class|com.github.griddb.kafka.connect.GriddbSourceConnector|
|name|the connector name| |
|host|GridDB host or multicast address| |
|port|GridDB port or multicast port| |
|cluster.name|GridDB cluster name| |
|user|GridDB username| |
|password|GridDB user password| |
|notification.member|GridDB notification member list in fixed list method| |
|containers|list of GridDB containers used by the source connector| |
|mode|the mode to import (bulk/timestamp)| |
|timestamp.column.name|the list of timestamp column in timestamp mode| |
|topic.prefix|the prefix of output topic| |
|polling.interval.ms|interval time for GridDB Kafka source connector to poll data|5000|
| Parameter | Description | Default Value |
| --------------------- | ------------------------------------------------------------ | ----------------------------------------------------- |
| connector.class | the source connector class | com.github.griddb.kafka.connect.GriddbSourceConnector |
| name | the connector name | |
| host | GridDB host or multicast address | |
| port | GridDB port or multicast port | |
| cluster.name | GridDB cluster name | |
| user | GridDB username | |
| password | GridDB user password | |
| notification.member | GridDB notification member list in fixed list method | |
| containers | list of GridDB containers used by the source connector | |
| mode | the mode to import (bulk/timestamp) | |
| timestamp.column.name | the list of timestamp column in timestamp mode | |
| batch.max.rows | The maximum rows for a batch | 100 |
| topic.prefix | the prefix of output topic | |
| polling.interval.ms | interval time for GridDB Kafka source connector to poll data | 5000 |

Note:
* In file config/griddb-source.properties: the config values (connector.class, name is the properties used by Kafka, not the connector).
Expand Down
12 changes: 5 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<groupId>com.github.griddb</groupId>
<artifactId>griddb-kafka-connector</artifactId>
<packaging>jar</packaging>
<version>0.5</version>
<version>0.6</version>
<name>griddb-kafka-connector</name>
<url>http://maven.apache.org</url>

Expand All @@ -18,17 +18,17 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>3.2.0</version>
<version>3.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>3.2.0</version>
<artifactId>kafka_2.13</artifactId>
<version>3.7.1</version>
</dependency>
<dependency>
<groupId>com.github.griddb</groupId>
<artifactId>gridstore</artifactId>
<version>5.0.0</version>
<version>5.6.0</version>
</dependency>
</dependencies>
<build>
Expand All @@ -43,8 +43,6 @@
<arg>-Xlint:all</arg>
<arg>-Werror</arg>
</compilerArgs>
<source>1.8</source>
<target>1.8</target>

</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.toshiba.mwcloud.gs.TimestampUtils;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema.Type;
Expand Down Expand Up @@ -151,6 +152,24 @@ public ContainerInfo getContainerInfo(String containerName) throws GSException {
public Container<?, Row> putContainer(String containerName, Collection<SinkRecordField> fields) throws GSException {

ContainerType type = ContainerType.COLLECTION;
String containerType =
config.getString(GriddbSinkConnectorConfig.CONTAINER_TYPE_CONFIG);

switch (containerType) {
case GriddbSinkConnectorConfig.CONTAINER_TYPE_COLLECTION:
break;
case GriddbSinkConnectorConfig.CONTAINER_TYPE_TIME_SERIES:
if (fields.size() > 0) {
SinkRecordField firstColumn = fields.stream().findFirst().get();
if (this.getGsType(firstColumn) == GSType.TIMESTAMP) {
type = ContainerType.TIME_SERIES;
}
}
break;
default:
throw new ConfigException("Invalid type: " + containerType);
}

List<ColumnInfo> columnInfoList = new ArrayList<>();
boolean rowKeyAssigned = true;
boolean modifiable = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ public class GriddbSinkConnectorConfig extends AbstractConfig {
public static final String NOTIFICATION_PROVIDER_CONFIG = "notification.provider.url";

public static final String USE_MULTIPUT_CONFIG = "multiput";
/** Container type config string. */
public static final String CONTAINER_TYPE_CONFIG = "container.type";
/** Container type option collection. */
public static final String CONTAINER_TYPE_COLLECTION = "COLLECTION";
/** Container type option time serries. */
public static final String CONTAINER_TYPE_TIME_SERIES = "TIME_SERIES";
/** Container type default. */
public static final String CONTAINER_TYPE_DEFAULT =
CONTAINER_TYPE_COLLECTION;
/** Container type document. */
private static final String CONTAINER_TYPE_DOC = "Specifies the type"
+ "of container GridDB is collection or time series";

public static final List<String> DEFAULT_KAFKA_PK_NAMES = Collections
.unmodifiableList(Arrays.asList("__connect_topic", "__connect_partition", "__connect_offset"));
Expand Down Expand Up @@ -83,6 +95,10 @@ public class GriddbSinkConnectorConfig extends AbstractConfig {
.defineInternal(PASSWORD_CONFIG, Type.STRING, "", Importance.HIGH)
.defineInternal(NOTIFICATION_MEMBER_CONFIG, Type.STRING, "", Importance.HIGH)
.defineInternal(NOTIFICATION_PROVIDER_CONFIG, Type.STRING, "", Importance.HIGH)
.define(CONTAINER_TYPE_CONFIG, Type.STRING, CONTAINER_TYPE_DEFAULT,
ConfigDef.ValidString.in(CONTAINER_TYPE_COLLECTION,
CONTAINER_TYPE_TIME_SERIES), Importance.HIGH,
CONTAINER_TYPE_DOC)
.define(CONTAINER_NAME_FORMAT, ConfigDef.Type.STRING, CONTAINER_NAME_FORMAT_DEFAULT,
ConfigDef.Importance.MEDIUM, CONTAINER_NAME_FORMAT_DOC, DATAMAPPING_GROUP, 1, ConfigDef.Width.LONG,
CONTAINER_NAME_FORMAT_DISPLAY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ public class GriddbSourceConnectorConfig extends AbstractConfig {
private static final ConfigDef.Range POSITIVE_INT_VALIDATOR = ConfigDef.Range.atLeast(1);
private static final int POLLING_INTERVAL_DEFAULT = 5000;
private static final String POLL_INTERVAL_DOC = "Frequency in ms to poll for new data in each table.";
/** Batch max row config. */
public static final String BATCH_MAX_ROW_CONFIG = "batch.max.rows";
/** Batch max row default. */
private static final int BATCH_MAX_ROW_DEFAULT = 100;
/** Batch max row document. */
private static final String BATCH_MAX_ROW_DOC =
"Max number of rows to get from Kafka at same time";

public static final String DATABASE_GROUP = "Database";
public static final String MODE_GROUP = "Mode";
Expand Down Expand Up @@ -103,6 +110,7 @@ public class GriddbSourceConnectorConfig extends AbstractConfig {
.defineInternal(NOTIFICATION_MEMBER_CONFIG, Type.STRING, "", Importance.HIGH)
.defineInternal(NOTIFICATION_PROVIDER_CONFIG, Type.STRING, "", Importance.HIGH)
.define(POLLING_INTERVAL_CONFIG, Type.INT, POLLING_INTERVAL_DEFAULT, POSITIVE_INT_VALIDATOR, Importance.HIGH, POLL_INTERVAL_DOC)
.defineInternal(BATCH_MAX_ROW_CONFIG, Type.INT, BATCH_MAX_ROW_DEFAULT, POSITIVE_INT_VALIDATOR, Importance.HIGH, BATCH_MAX_ROW_DOC)
.define(CONTAINERS_CONFIG, Type.LIST, Importance.HIGH, CONTAINERS_DOC)
.define(MODE_CONFIG, Type.STRING, MODE_UNSPECIFIED,
ConfigDef.ValidString.in(MODE_UNSPECIFIED, MODE_BULK, MODE_TIMESTAMP), Importance.HIGH, MODE_DOC,
Expand Down Expand Up @@ -130,4 +138,4 @@ public GriddbSourceConnectorConfig(Map<String, ?> props) {

containers = getList(CONTAINERS_CONFIG);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,16 @@ public List<SourceRecord> poll() throws InterruptedException {
}

final List<SourceRecord> results = new ArrayList<>();
final int batchMaxRow =
config.getInt(GriddbSourceConnectorConfig.BATCH_MAX_ROW_CONFIG);

boolean hadNext = true;
try {
querier.startQuery();

while (hadNext = querier.hasNext()) {
hadNext = querier.hasNext();
while (hadNext && (results.size() < batchMaxRow)) {
results.add(querier.extractRecord());
hadNext = querier.hasNext();
}
} catch (GSException e) {
LOG.error("Failed to run query for container {}: {}", querier.toString(), e);
Expand Down
8 changes: 4 additions & 4 deletions src/main/resources/connector.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
sink_task_version=GriddbSinkTask_0.5.0
sink_connector_version=GriddbSinkConnector_0.5.0
source_task_version=GriddbSourceTask_0.5.0
source_connector_version=GriddbSourceConnector_0.5.0
sink_task_version=GriddbSinkTask_0.6.0
sink_connector_version=GriddbSinkConnector_0.6.0
source_task_version=GriddbSourceTask_0.6.0
source_connector_version=GriddbSourceConnector_0.6.0

0 comments on commit a0f0108

Please sign in to comment.