diff --git a/README.md b/README.md index 8de62f4..5bc832a 100644 --- a/README.md +++ b/README.md @@ -8,11 +8,11 @@ The Griddb Kafka Connector is a Kafka Connector for loading data to and from Gri Building of the library and execution of the sample programs have been checked in the following environment. - OS: CentOS 7.9(x64) / Ubuntu 20.04(x64) - Java: 1.8 - Maven: 3.5.0 - Kafka: 2.12-3.2.0 - GridDB server: V5.0 CE + OS: Ubuntu 24.04(x64) + Java: 17 + Maven: 3.9.5 + Kafka: 2.13-3.7.1 + GridDB server: V5.6 CE, Ubuntu 22.04(x64) # Build diff --git a/README_ja.md b/README_ja.md new file mode 100644 index 0000000..cff25c1 --- /dev/null +++ b/README_ja.md @@ -0,0 +1,59 @@ +GridDB Kafkaコネクタ + +# 概要 + +GridDB Kafkaコネクタは、[Apache Kafka](https://kafka.apache.org/)とGridDBデータベースとの間でデータのやり取りを可能にする +ソフトウェアです。 + +# 動作環境 + +以下の環境でビルドとサンプルプログラムの実行を確認しています。 + + OS: Ubuntu 24.04(x64) + Java: 17 + Maven: 3.9.5 + Kafka: 2.13-3.7.1 + GridDB server: V5.6 CE, Ubuntu 22.04(x64) + +# ビルド + +```console +$ cd GRIDDB_KAFKA_CONNECTOR_FOLDER +$ mvn clean install +``` + +# 実行 + +ビルド後、GRIDDB_KAFKA_CONNECTOR_FOLDER/target/griddb-kafka-connector-X.X.X.jarファイルをKAFKA_FOLDER/libsフォルダの下に配置してください。 +そして、Kafkaサーバを起動してください。 + +※ X.X.Xはバージョンを意味します。 + +configフォルダ上に、GridDB KafkaコネクタのSink connector、Source connectorを動かすためのコンフィグファイルのサンプルがあります。 + +## サンプルの実行(Sink connector、Source connector) + +["Using Apache Kafka with GridDB database"](docs/GridDB-kafka-sink-connect-and-source-connect-guide.md)をご参照ください。 + +# 機能 + +以下のデータ型が利用可能です。 +- GridDBのSTRING型, BOOL型, BYTE型, SHORT型, INTEGER型, LONG型, FLOAT型, DOUBLE型, TIMESTAMP型, BLOB型 + +以下のデータ型は利用できません。 +- GridDbのGEOMETRY型, 配列型 + +## コミュニティ + * Issues + 質問、不具合報告はissue機能をご利用ください。 + * PullRequest + GridDB Contributor License Agreement(CLA_rev1.1.pdf)に同意して頂く必要があります。 + PullRequest機能をご利用の場合はGridDB Contributor License Agreementに同意したものとみなします。 + +# ライセンス + + GridDB KafkaコネクタのライセンスはApache License, version 2.0です。 + +# 商標 + + Apache Kafka, Kafka are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. diff --git a/config/griddb-sink.properties b/config/griddb-sink.properties index b387310..e956a0e 100644 --- a/config/griddb-sink.properties +++ b/config/griddb-sink.properties @@ -9,6 +9,8 @@ password=admin notification.member= notification.provider.url= +container.type= + topics.regex=csh(.*) # topics= diff --git a/config/griddb-source.properties b/config/griddb-source.properties index c60cf3b..f9094dd 100644 --- a/config/griddb-source.properties +++ b/config/griddb-source.properties @@ -10,6 +10,7 @@ notification.member= notification.provider.url= poll.interval.ms=5000 +batch.max.rows= containers=timestampContainer diff --git a/docs/GridDB-kafka-sink-connect-and-source-connect-guide.md b/docs/GridDB-kafka-sink-connect-and-source-connect-guide.md index d482b94..eedc127 100644 --- a/docs/GridDB-kafka-sink-connect-and-source-connect-guide.md +++ b/docs/GridDB-kafka-sink-connect-and-source-connect-guide.md @@ -88,19 +88,19 @@ Note: Please refer to an example in GridDB written for 2 modes as below. Building of the library and execution of the sample programs have been checked in the following environment. - OS: CentOS 7.9(x64)/Ubuntu 20.04 - Java: 1.8 - Maven: 3.5.0 - Kafka: 2.12-3.2.0 - GridDB server: V5.0 CE + OS: Ubuntu 24.04(x64) + Java: 17 + Maven: 3.9.5 + Kafka: 2.13-3.7.1 + GridDB server: V5.6 CE, Ubuntu 22.04(x64) ### Install and start Apache Kafka ```console -$ wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.12-3.2.0.tgz -$ tar xzvf kafka_2.12-3.2.0.tgz -$ cd kafka_2.12-3.2.0 -$ export PATH=$PATH:/path/to/kafka_2.12-3.2.0/bin +$ wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.7.1.tgz +$ tar xzvf kafka_2.13-3.7.1.tgz +$ cd kafka_2.13-3.7.1 +$ export PATH=$PATH:/path/to/kafka_2.13-3.7.1/bin $ zookeeper-server-start.sh -daemon config/zookeeper.properties # Start zookeeper server $ kafka-server-start.sh config/server.properties # Start Apache Kafka server ``` @@ -134,7 +134,7 @@ Note: 2. After using command `$ mvn clean install`, file griddb-kafka-connector-X.X.jar will be created in "target/" folder. -3. Copy the griddb-kafka-connector-X.X.jar file to /path/to/kafka_2.12-3.2.0/libs/ +3. Copy the griddb-kafka-connector-X.X.jar file to /path/to/kafka_2.13-3.7.1/libs/ ## About GridDB Kafka sink connector @@ -156,7 +156,7 @@ Note: ``` * Export PATH for executing script_sink.sh ```console - $ export PATH=$PATH:/path/to/kafka_2.12-3.2.0/bin + $ export PATH=$PATH:/path/to/kafka_2.13-3.7.1/bin ``` * Run script_sink.sh for Apache Kafka @@ -184,7 +184,7 @@ Note: ``` 2. Open a new terminal for executing this command: ```console - $ cd kafka_2.12-3.2.0 + $ cd kafka_2.13-3.7.1 $ ./bin/connect-standalone.sh config/connect-standalone.properties GRIDDB_KAFKA_CONNECTOR_FOLDER/config/griddb-sink.properties ``` 3. After finishing the command above, data/topic was pushed into GridDB database. @@ -228,7 +228,8 @@ The 1 results had been acquired. |batch.size|the size of write buffer to GridDB|3000| |multiput|using multiput or single put in write buffer|true| |container.name.format|using it to change to topic name from GridDB container|$(topic): The default container name is topic name | - + |container.type|The GridDB container type. Choose either `COLLECTION` or `TIME_SERIES`. The TIME_SERIES container will be created when the `container.type` is TIME_SERIES and the first column is TIMESTAMP, otherwise the COLLECTION container will be created.|COLLECTION | + Note: * In file config/griddb-sink.properties: config values (connector.class, name, topics.regex or topics, transforms) are the properties used by Apache Kafka, not the connector). * Just configure one property between "topics.regex" and "topics". @@ -277,7 +278,7 @@ Note: 2. Open new terminal for executing commands: ```console - $ cd kafka_2.12-3.2.0 + $ cd kafka_2.13-3.7.1 $ ./bin/connect-standalone.sh config/connect-standalone.properties GRIDDB_KAFKA_CONNECTOR_FOLDER/config/griddb-source.properties ``` Note: @@ -311,21 +312,22 @@ Note: * GridDB Kafka source connector config parameters in the config file - |Parameter | Description | Default Value | - |---|---|---| - |connector.class|the source connector class|com.github.griddb.kafka.connect.GriddbSourceConnector| - |name|the connector name| | - |host|GridDB host or multicast address| | - |port|GridDB port or multicast port| | - |cluster.name|GridDB cluster name| | - |user|GridDB username| | - |password|GridDB user password| | - |notification.member|GridDB notification member list in fixed list method| | - |containers|list of GridDB containers used by the source connector| | - |mode|the mode to import (bulk/timestamp)| | - |timestamp.column.name|the list of timestamp column in timestamp mode| | - |topic.prefix|the prefix of output topic| | - |polling.interval.ms|interval time for GridDB Kafka source connector to poll data|5000| + | Parameter | Description | Default Value | + | --------------------- | ------------------------------------------------------------ | ----------------------------------------------------- | + | connector.class | the source connector class | com.github.griddb.kafka.connect.GriddbSourceConnector | + | name | the connector name | | + | host | GridDB host or multicast address | | + | port | GridDB port or multicast port | | + | cluster.name | GridDB cluster name | | + | user | GridDB username | | + | password | GridDB user password | | + | notification.member | GridDB notification member list in fixed list method | | + | containers | list of GridDB containers used by the source connector | | + | mode | the mode to import (bulk/timestamp) | | + | timestamp.column.name | the list of timestamp column in timestamp mode | | + | batch.max.rows | The maximum rows for a batch | 100 | + | topic.prefix | the prefix of output topic | | + | polling.interval.ms | interval time for GridDB Kafka source connector to poll data | 5000 | Note: * In file config/griddb-source.properties: the config values (connector.class, name is the properties used by Kafka, not the connector). diff --git a/pom.xml b/pom.xml index 0690c45..6301335 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ com.github.griddb griddb-kafka-connector jar - 0.5 + 0.6 griddb-kafka-connector http://maven.apache.org @@ -18,17 +18,17 @@ org.apache.kafka connect-api - 3.2.0 + 3.7.1 org.apache.kafka - kafka_2.12 - 3.2.0 + kafka_2.13 + 3.7.1 com.github.griddb gridstore - 5.0.0 + 5.6.0 @@ -43,8 +43,6 @@ -Xlint:all -Werror - 1.8 - 1.8 diff --git a/src/main/java/com/github/griddb/kafka/connect/dialect/GriddbDatabaseDialect.java b/src/main/java/com/github/griddb/kafka/connect/dialect/GriddbDatabaseDialect.java index ba0ea26..1162690 100644 --- a/src/main/java/com/github/griddb/kafka/connect/dialect/GriddbDatabaseDialect.java +++ b/src/main/java/com/github/griddb/kafka/connect/dialect/GriddbDatabaseDialect.java @@ -44,6 +44,7 @@ import com.toshiba.mwcloud.gs.TimestampUtils; import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema.Type; @@ -151,6 +152,24 @@ public ContainerInfo getContainerInfo(String containerName) throws GSException { public Container putContainer(String containerName, Collection fields) throws GSException { ContainerType type = ContainerType.COLLECTION; + String containerType = + config.getString(GriddbSinkConnectorConfig.CONTAINER_TYPE_CONFIG); + + switch (containerType) { + case GriddbSinkConnectorConfig.CONTAINER_TYPE_COLLECTION: + break; + case GriddbSinkConnectorConfig.CONTAINER_TYPE_TIME_SERIES: + if (fields.size() > 0) { + SinkRecordField firstColumn = fields.stream().findFirst().get(); + if (this.getGsType(firstColumn) == GSType.TIMESTAMP) { + type = ContainerType.TIME_SERIES; + } + } + break; + default: + throw new ConfigException("Invalid type: " + containerType); + } + List columnInfoList = new ArrayList<>(); boolean rowKeyAssigned = true; boolean modifiable = true; diff --git a/src/main/java/com/github/griddb/kafka/connect/sink/GriddbSinkConnectorConfig.java b/src/main/java/com/github/griddb/kafka/connect/sink/GriddbSinkConnectorConfig.java index 8af2a0a..9402bae 100644 --- a/src/main/java/com/github/griddb/kafka/connect/sink/GriddbSinkConnectorConfig.java +++ b/src/main/java/com/github/griddb/kafka/connect/sink/GriddbSinkConnectorConfig.java @@ -42,6 +42,18 @@ public class GriddbSinkConnectorConfig extends AbstractConfig { public static final String NOTIFICATION_PROVIDER_CONFIG = "notification.provider.url"; public static final String USE_MULTIPUT_CONFIG = "multiput"; + /** Container type config string. */ + public static final String CONTAINER_TYPE_CONFIG = "container.type"; + /** Container type option collection. */ + public static final String CONTAINER_TYPE_COLLECTION = "COLLECTION"; + /** Container type option time serries. */ + public static final String CONTAINER_TYPE_TIME_SERIES = "TIME_SERIES"; + /** Container type default. */ + public static final String CONTAINER_TYPE_DEFAULT = + CONTAINER_TYPE_COLLECTION; + /** Container type document. */ + private static final String CONTAINER_TYPE_DOC = "Specifies the type" + + "of container GridDB is collection or time series"; public static final List DEFAULT_KAFKA_PK_NAMES = Collections .unmodifiableList(Arrays.asList("__connect_topic", "__connect_partition", "__connect_offset")); @@ -83,6 +95,10 @@ public class GriddbSinkConnectorConfig extends AbstractConfig { .defineInternal(PASSWORD_CONFIG, Type.STRING, "", Importance.HIGH) .defineInternal(NOTIFICATION_MEMBER_CONFIG, Type.STRING, "", Importance.HIGH) .defineInternal(NOTIFICATION_PROVIDER_CONFIG, Type.STRING, "", Importance.HIGH) + .define(CONTAINER_TYPE_CONFIG, Type.STRING, CONTAINER_TYPE_DEFAULT, + ConfigDef.ValidString.in(CONTAINER_TYPE_COLLECTION, + CONTAINER_TYPE_TIME_SERIES), Importance.HIGH, + CONTAINER_TYPE_DOC) .define(CONTAINER_NAME_FORMAT, ConfigDef.Type.STRING, CONTAINER_NAME_FORMAT_DEFAULT, ConfigDef.Importance.MEDIUM, CONTAINER_NAME_FORMAT_DOC, DATAMAPPING_GROUP, 1, ConfigDef.Width.LONG, CONTAINER_NAME_FORMAT_DISPLAY) diff --git a/src/main/java/com/github/griddb/kafka/connect/source/GriddbSourceConnectorConfig.java b/src/main/java/com/github/griddb/kafka/connect/source/GriddbSourceConnectorConfig.java index d89eaa4..cd373c1 100644 --- a/src/main/java/com/github/griddb/kafka/connect/source/GriddbSourceConnectorConfig.java +++ b/src/main/java/com/github/griddb/kafka/connect/source/GriddbSourceConnectorConfig.java @@ -43,6 +43,13 @@ public class GriddbSourceConnectorConfig extends AbstractConfig { private static final ConfigDef.Range POSITIVE_INT_VALIDATOR = ConfigDef.Range.atLeast(1); private static final int POLLING_INTERVAL_DEFAULT = 5000; private static final String POLL_INTERVAL_DOC = "Frequency in ms to poll for new data in each table."; + /** Batch max row config. */ + public static final String BATCH_MAX_ROW_CONFIG = "batch.max.rows"; + /** Batch max row default. */ + private static final int BATCH_MAX_ROW_DEFAULT = 100; + /** Batch max row document. */ + private static final String BATCH_MAX_ROW_DOC = + "Max number of rows to get from Kafka at same time"; public static final String DATABASE_GROUP = "Database"; public static final String MODE_GROUP = "Mode"; @@ -103,6 +110,7 @@ public class GriddbSourceConnectorConfig extends AbstractConfig { .defineInternal(NOTIFICATION_MEMBER_CONFIG, Type.STRING, "", Importance.HIGH) .defineInternal(NOTIFICATION_PROVIDER_CONFIG, Type.STRING, "", Importance.HIGH) .define(POLLING_INTERVAL_CONFIG, Type.INT, POLLING_INTERVAL_DEFAULT, POSITIVE_INT_VALIDATOR, Importance.HIGH, POLL_INTERVAL_DOC) + .defineInternal(BATCH_MAX_ROW_CONFIG, Type.INT, BATCH_MAX_ROW_DEFAULT, POSITIVE_INT_VALIDATOR, Importance.HIGH, BATCH_MAX_ROW_DOC) .define(CONTAINERS_CONFIG, Type.LIST, Importance.HIGH, CONTAINERS_DOC) .define(MODE_CONFIG, Type.STRING, MODE_UNSPECIFIED, ConfigDef.ValidString.in(MODE_UNSPECIFIED, MODE_BULK, MODE_TIMESTAMP), Importance.HIGH, MODE_DOC, @@ -130,4 +138,4 @@ public GriddbSourceConnectorConfig(Map props) { containers = getList(CONTAINERS_CONFIG); } -} \ No newline at end of file +} diff --git a/src/main/java/com/github/griddb/kafka/connect/source/GriddbSourceTask.java b/src/main/java/com/github/griddb/kafka/connect/source/GriddbSourceTask.java index 96e6563..822ac6e 100644 --- a/src/main/java/com/github/griddb/kafka/connect/source/GriddbSourceTask.java +++ b/src/main/java/com/github/griddb/kafka/connect/source/GriddbSourceTask.java @@ -99,12 +99,16 @@ public List poll() throws InterruptedException { } final List results = new ArrayList<>(); + final int batchMaxRow = + config.getInt(GriddbSourceConnectorConfig.BATCH_MAX_ROW_CONFIG); + boolean hadNext = true; try { querier.startQuery(); - - while (hadNext = querier.hasNext()) { + hadNext = querier.hasNext(); + while (hadNext && (results.size() < batchMaxRow)) { results.add(querier.extractRecord()); + hadNext = querier.hasNext(); } } catch (GSException e) { LOG.error("Failed to run query for container {}: {}", querier.toString(), e); diff --git a/src/main/resources/connector.properties b/src/main/resources/connector.properties index fe4dd8e..76469bb 100644 --- a/src/main/resources/connector.properties +++ b/src/main/resources/connector.properties @@ -1,4 +1,4 @@ -sink_task_version=GriddbSinkTask_0.5.0 -sink_connector_version=GriddbSinkConnector_0.5.0 -source_task_version=GriddbSourceTask_0.5.0 -source_connector_version=GriddbSourceConnector_0.5.0 \ No newline at end of file +sink_task_version=GriddbSinkTask_0.6.0 +sink_connector_version=GriddbSinkConnector_0.6.0 +source_task_version=GriddbSourceTask_0.6.0 +source_connector_version=GriddbSourceConnector_0.6.0 \ No newline at end of file