From f73d18cf59ff2ae2218f7e091f7d39c6fabc94fb Mon Sep 17 00:00:00 2001 From: hailin0 Date: Fri, 23 Sep 2022 23:33:04 +0800 Subject: [PATCH] [Imporve][Connector-V2] Imporve iotdb connector MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Source: * Add e2e testcase * Add example into documents * Support `align by` sql syntax * Support sql split ignore case * Support restore split offset to `at-least-once` * Support read timestamp from `RowRecord` * Fix assign split: splitId % taskSize -> splitId % readerSize * Fix assign split owner(negative number) Sink: * Support extract `timestamp`、`device`、`measurement` from SeaTunnelRow * Support TINYINT、SMALLINT * Add example into documents * Support flush cache to database before `prepareCommit` --- docs/en/connector-v2/sink/IoTDB.md | 143 +++++++--- docs/en/connector-v2/source/IoTDB.md | 69 ++++- .../common/sink/AbstractSinkWriter.java | 2 +- .../seatunnel/iotdb/config/SinkConfig.java | 47 ++- .../iotdb/constant/SourceConstants.java | 2 + .../DefaultSeaTunnelRowDeserializer.java | 107 +++++++ .../DefaultSeaTunnelRowSerializer.java | 268 ++++++++++-------- .../serialize/SeaTunnelRowDeserializer.java | 27 ++ .../seatunnel/iotdb/sink/IoTDBSinkClient.java | 2 +- .../seatunnel/iotdb/sink/IoTDBSinkWriter.java | 13 +- .../seatunnel/iotdb/source/IoTDBSource.java | 2 +- .../iotdb/source/IoTDBSourceReader.java | 83 ++---- .../iotdb/source/IoTDBSourceSplit.java | 3 + .../source/IoTDBSourceSplitEnumerator.java | 134 ++++++--- .../iotdb/state/IoTDBSourceState.java | 19 +- .../flink/v2/iotdb/FakeSourceToIoTDBIT.java | 128 --------- .../seatunnel/e2e/flink/v2/iotdb/IoTDBIT.java | 209 ++++++++++++++ ...o_iotdb.conf => iotdb_source_to_sink.conf} | 43 +-- .../spark/v2/iotdb/FakeSourceToIoTDBIT.java | 127 --------- .../seatunnel/e2e/spark/v2/iotdb/IoTDBIT.java | 208 ++++++++++++++ .../resources/iotdb/fakesource_to_iotdb.conf | 41 +-- .../resources/iotdb/iotdb_source_to_sink.conf | 73 +++++ 22 files changed, 1169 insertions(+), 581 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowDeserializer.java create mode 100644 seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/SeaTunnelRowDeserializer.java delete mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/FakeSourceToIoTDBIT.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/IoTDBIT.java rename seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/resources/iotdb/{fakesource_to_iotdb.conf => iotdb_source_to_sink.conf} (50%) delete mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/FakeSourceToIoTDBIT.java create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/IoTDBIT.java create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf diff --git a/docs/en/connector-v2/sink/IoTDB.md b/docs/en/connector-v2/sink/IoTDB.md index 31389c03f74..3affa6f8c55 100644 --- a/docs/en/connector-v2/sink/IoTDB.md +++ b/docs/en/connector-v2/sink/IoTDB.md @@ -23,25 +23,26 @@ There is a conflict of thrift version between IoTDB and Spark.Therefore, you nee ## Options -| name | type | required | default value | -|-------------------------------|-------------------|----------|---------------| -| node_urls | list | yes | - | -| username | string | yes | - | -| password | string | yes | - | -| batch_size | int | no | 1024 | -| batch_interval_ms | int | no | - | -| max_retries | int | no | - | -| retry_backoff_multiplier_ms | int | no | - | -| max_retry_backoff_ms | int | no | - | -| default_thrift_buffer_size | int | no | - | -| max_thrift_frame_size | int | no | - | -| zone_id | string | no | - | -| enable_rpc_compression | boolean | no | - | -| connection_timeout_in_ms | int | no | - | -| timeseries_options | list | no | - | -| timeseries_options.path | string | no | - | -| timeseries_options.data_type | string | no | - | -| common-options | string | no | - | +| name | type | required | default value | +|-------------------------------|-------------------|----------|-----------------------------------| +| node_urls | list | yes | - | +| username | string | yes | - | +| password | string | yes | - | +| key_device | string | yes | - | +| key_timestamp | string | no | processing time | +| key_measurement_fields | array | no | exclude `device` & `timestamp` | +| storage_group | string | no | - | +| batch_size | int | no | 1024 | +| batch_interval_ms | int | no | - | +| max_retries | int | no | - | +| retry_backoff_multiplier_ms | int | no | - | +| max_retry_backoff_ms | int | no | - | +| default_thrift_buffer_size | int | no | - | +| max_thrift_frame_size | int | no | - | +| zone_id | string | no | - | +| enable_rpc_compression | boolean | no | - | +| connection_timeout_in_ms | int | no | - | +| common-options | string | no | - | ### node_urls [list] @@ -55,6 +56,24 @@ There is a conflict of thrift version between IoTDB and Spark.Therefore, you nee `IoTDB` user password +### key_device [string] + +Specify field name of the `IoTDB` deviceId in SeaTunnelRow + +### key_timestamp [string] + +Specify field-name of the `IoTDB` timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp + +### key_measurement_fields [array] + +Specify field-name of the `IoTDB` measurement list in SeaTunnelRow. If not specified, include all fields but exclude `device` & `timestamp` + +### storage_group [string] + +Specify device storage group(path prefix) + +example: deviceId = ${storage_group} + "." + ${key_device} + ### batch_size [int] For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the IoTDB @@ -95,24 +114,16 @@ Enable rpc compression in `IoTDB` client The maximum time (in ms) to wait when connect `IoTDB` -### timeseries_options [list] - -Timeseries options - -### timeseries_options.path [string] - -Timeseries path - -### timeseries_options.data_type [string] - -Timeseries data type - ### common options [string] Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details ## Examples +### Case1 + +Common options: + ```hocon sink { IoTDB { @@ -123,4 +134,72 @@ sink { batch_interval_ms = 1000 } } -``` \ No newline at end of file +``` + +When you assign `key_device` is `device_name`, for example: + +```hocon +sink { + IoTDB { + ... + key_device = "device_name" + } +} +``` + +Upstream SeaTunnelRow data format is the following: + +| device_name | field_1 | field_2 | +|----------------------------|-------------|-------------| +| root.test_group.device_a | 1001 | 1002 | +| root.test_group.device_b | 2001 | 2002 | +| root.test_group.device_c | 3001 | 3002 | + +Output to `IoTDB` data format is the following: + +```shell +IoTDB> SELECT * FROM root.test_group.* align by device; ++------------------------+------------------------+-----------+----------+ +| Time| Device| field_1| field_2| ++------------------------+------------------------+----------+-----------+ +|2022-09-26T17:50:01.201Z|root.test_group.device_a| 1001| 1002| +|2022-09-26T17:50:01.202Z|root.test_group.device_b| 2001| 2002| +|2022-09-26T17:50:01.203Z|root.test_group.device_c| 3001| 3002| ++------------------------+------------------------+----------+-----------+ +``` + +### Case2 + +When you assign `key_device`、`key_timestamp`、`key_measurement_fields`, for example: + +```hocon +sink { + IoTDB { + ... + key_device = "device_name" + key_timestamp = "ts" + key_measurement_fields = ["temperature", "moisture"] + } +} +``` + +Upstream SeaTunnelRow data format is the following: + +|ts | device_name | field_1 | field_2 | temperature | moisture | +|--------------------|----------------------------|-------------|-------------|-------------|-------------| +|1664035200001 | root.test_group.device_a | 1001 | 1002 | 36.1 | 100 | +|1664035200001 | root.test_group.device_b | 2001 | 2002 | 36.2 | 101 | +|1664035200001 | root.test_group.device_c | 3001 | 3002 | 36.3 | 102 | + +Output to `IoTDB` data format is the following: + +```shell +IoTDB> SELECT * FROM root.test_group.* align by device; ++------------------------+------------------------+--------------+-----------+ +| Time| Device| temperature| moisture| ++------------------------+------------------------+--------------+-----------+ +|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100| +|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101| +|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102| ++------------------------+------------------------+--------------+-----------+ +``` diff --git a/docs/en/connector-v2/source/IoTDB.md b/docs/en/connector-v2/source/IoTDB.md index 01a3487a387..c8980be9428 100644 --- a/docs/en/connector-v2/source/IoTDB.md +++ b/docs/en/connector-v2/source/IoTDB.md @@ -22,14 +22,14 @@ supports query SQL and can achieve projection effect. | name | type | required | default value | |----------------------------|---------|----------|---------------| -| host | string | yes | - | -| port | Int | yes | - | -| node_urls | string | yes | - | -| sql | string | yes | | +| host | string | no | - | +| port | int | no | - | +| node_urls | string | no | - | +| username | string | yes | - | +| password | string | yes | - | +| sql | string | yes | - | | fields | config | yes | - | | fetch_size | int | no | - | -| username | string | no | - | -| password | string | no | - | | lower_bound | long | no | - | | upper_bound | long | no | - | | num_partitions | int | no | - | @@ -147,3 +147,60 @@ lower bound of the time column ``` +## Examples + +### Case1 + +Common options: + +```hocon +source { + IoTDB { + node_urls = "localhost:6667" + username = "root" + password = "root" + } +} +``` + +When you assign `sql`、`fields`、`partition`, for example: + +```hocon +sink { + IoTDB { + ... + sql = "SELECT temperature, moisture FROM root.test_group.* WHERE time < 4102329600000 align by device" + lower_bound = 1 + upper_bound = 4102329600000 + num_partitions = 10 + fields { + ts = bigint + device_name = string + + temperature = float + moisture = bigint + } + } +} +``` + +Upstream `IoTDB` data format is the following: + +```shell +IoTDB> SELECT temperature, moisture FROM root.test_group.* WHERE time < 4102329600000 align by device; ++------------------------+------------------------+--------------+-----------+ +| Time| Device| temperature| moisture| ++------------------------+------------------------+--------------+-----------+ +|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100| +|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101| +|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102| ++------------------------+------------------------+--------------+-----------+ +``` + +Loaded to SeaTunnelRow data format is the following: + +|ts | device_name | temperature | moisture | +|--------------------|----------------------------|-------------|-------------| +|1664035200001 | root.test_group.device_a | 36.1 | 100 | +|1664035200001 | root.test_group.device_b | 36.2 | 101 | +|1664035200001 | root.test_group.device_c | 36.3 | 102 | diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java index 92c19dbf6f3..9c836fe8e39 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java @@ -24,7 +24,7 @@ public abstract class AbstractSinkWriter implements SinkWriter { @Override - public final Optional prepareCommit() { + public Optional prepareCommit() { return Optional.empty(); } diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SinkConfig.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SinkConfig.java index b6320da8f70..f942af672d3 100644 --- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SinkConfig.java @@ -22,16 +22,12 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; -import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NonNull; import lombok.Setter; import lombok.ToString; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import java.io.Serializable; import java.time.ZoneId; -import java.util.ArrayList; import java.util.List; @Setter @@ -39,6 +35,10 @@ @ToString public class SinkConfig extends CommonConfig { + public static final String KEY_TIMESTAMP = "key_timestamp"; + public static final String KEY_DEVICE = "key_device"; + public static final String KEY_MEASUREMENT_FIELDS = "key_measurement_fields"; + public static final String STORAGE_GROUP = "storage_group"; public static final String BATCH_SIZE = "batch_size"; public static final String BATCH_INTERVAL_MS = "batch_interval_ms"; public static final String MAX_RETRIES = "max_retries"; @@ -49,12 +49,13 @@ public class SinkConfig extends CommonConfig { public static final String ZONE_ID = "zone_id"; public static final String ENABLE_RPC_COMPRESSION = "enable_rpc_compression"; public static final String CONNECTION_TIMEOUT_IN_MS = "connection_timeout_in_ms"; - public static final String TIMESERIES_OPTIONS = "timeseries_options"; - public static final String TIMESERIES_OPTION_PATH = "path"; - public static final String TIMESERIES_OPTION_DATA_TYPE = "data_type"; private static final int DEFAULT_BATCH_SIZE = 1024; + private String keyTimestamp; + private String keyDevice; + private List keyMeasurementFields; + private String storageGroup; private int batchSize = DEFAULT_BATCH_SIZE; private Integer batchIntervalMs; private int maxRetries; @@ -65,7 +66,6 @@ public class SinkConfig extends CommonConfig { private ZoneId zoneId; private Boolean enableRPCCompression; private Integer connectionTimeoutInMs; - private List timeseriesOptions; public SinkConfig(@NonNull List nodeUrls, @NonNull String username, @@ -78,6 +78,17 @@ public static SinkConfig loadConfig(Config pluginConfig) { pluginConfig.getStringList(NODE_URLS), pluginConfig.getString(USERNAME), pluginConfig.getString(PASSWORD)); + + sinkConfig.setKeyDevice(pluginConfig.getString(KEY_DEVICE)); + if (pluginConfig.hasPath(KEY_TIMESTAMP)) { + sinkConfig.setKeyTimestamp(pluginConfig.getString(KEY_TIMESTAMP)); + } + if (pluginConfig.hasPath(KEY_MEASUREMENT_FIELDS)) { + sinkConfig.setKeyMeasurementFields(pluginConfig.getStringList(KEY_MEASUREMENT_FIELDS)); + } + if (pluginConfig.hasPath(STORAGE_GROUP)) { + sinkConfig.setStorageGroup(pluginConfig.getString(STORAGE_GROUP)); + } if (pluginConfig.hasPath(BATCH_SIZE)) { int batchSize = checkIntArgument(pluginConfig.getInt(BATCH_SIZE)); sinkConfig.setBatchSize(batchSize); @@ -117,18 +128,6 @@ public static SinkConfig loadConfig(Config pluginConfig) { checkNotNull(sinkConfig.getEnableRPCCompression()); sinkConfig.setConnectionTimeoutInMs(connectionTimeoutInMs); } - if (pluginConfig.hasPath(TIMESERIES_OPTIONS)) { - List timeseriesConfigs = pluginConfig.getConfigList(TIMESERIES_OPTIONS); - List timeseriesOptions = new ArrayList<>(timeseriesConfigs.size()); - for (Config timeseriesConfig : timeseriesConfigs) { - String timeseriesPath = timeseriesConfig.getString(TIMESERIES_OPTION_PATH); - String timeseriesDataType = timeseriesConfig.getString(TIMESERIES_OPTION_DATA_TYPE); - TimeseriesOption timeseriesOption = new TimeseriesOption( - timeseriesPath, TSDataType.valueOf(timeseriesDataType)); - timeseriesOptions.add(timeseriesOption); - } - sinkConfig.setTimeseriesOptions(timeseriesOptions); - } return sinkConfig; } @@ -136,12 +135,4 @@ private static int checkIntArgument(int args) { checkArgument(args > 0); return args; } - - @Getter - @ToString - @AllArgsConstructor - public static class TimeseriesOption implements Serializable { - private String path; - private TSDataType dataType = TSDataType.TEXT; - } } diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/constant/SourceConstants.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/constant/SourceConstants.java index 25b657cfa90..c6167454c8b 100644 --- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/constant/SourceConstants.java +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/constant/SourceConstants.java @@ -27,6 +27,8 @@ public class SourceConstants { public static final String SQL_WHERE = "where"; + public static final String SQL_ALIGN = "align by"; + public static final String DEFAULT_PARTITIONS = "0"; } diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowDeserializer.java new file mode 100644 index 00000000000..4ab773cbcd7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowDeserializer.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.iotdb.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import lombok.AllArgsConstructor; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; + +import java.time.ZoneOffset; +import java.util.Date; +import java.util.List; + +@AllArgsConstructor +public class DefaultSeaTunnelRowDeserializer implements SeaTunnelRowDeserializer { + + private final SeaTunnelRowType rowType; + + @Override + public SeaTunnelRow deserialize(RowRecord rowRecord) { + return convert(rowRecord); + } + + private SeaTunnelRow convert(RowRecord rowRecord) { + long timestamp = rowRecord.getTimestamp(); + List fields = rowRecord.getFields(); + if (fields.size() != (rowType.getTotalFields() - 1)) { + throw new IllegalStateException("Illegal SeaTunnelRowType: " + rowRecord); + } + + Object[] seaTunnelFields = new Object[rowType.getTotalFields()]; + seaTunnelFields[0] = convertTimestamp(timestamp, rowType.getFieldType(0)); + for (int i = 1; i < rowType.getTotalFields(); i++) { + Field field = fields.get(i - 1); + if (field == null || field.getDataType() == null) { + seaTunnelFields[i] = null; + continue; + } + SeaTunnelDataType seaTunnelFieldType = rowType.getFieldType(i); + seaTunnelFields[i] = convert(seaTunnelFieldType, field); + } + return new SeaTunnelRow(seaTunnelFields); + } + + private Object convert(SeaTunnelDataType seaTunnelFieldType, + Field field) { + switch (field.getDataType()) { + case INT32: + Number int32 = field.getIntV(); + switch (seaTunnelFieldType.getSqlType()) { + case TINYINT: + return int32.byteValue(); + case SMALLINT: + return int32.shortValue(); + case INT: + return int32.intValue(); + default: + throw new UnsupportedOperationException("Unsupported data type: " + seaTunnelFieldType); + } + case INT64: + return field.getLongV(); + case FLOAT: + return field.getFloatV(); + case DOUBLE: + return field.getDoubleV(); + case TEXT: + return field.getStringValue(); + case BOOLEAN: + return field.getBoolV(); + default: + throw new IllegalArgumentException("unknown TSData type: " + field.getDataType()); + } + } + + private Object convertTimestamp(long timestamp, + SeaTunnelDataType seaTunnelFieldType) { + switch (seaTunnelFieldType.getSqlType()) { + case TIMESTAMP: + return new Date(timestamp) + .toInstant() + .atZone(ZoneOffset.UTC) + .toLocalDateTime(); + case BIGINT: + return timestamp; + default: + throw new UnsupportedOperationException("Unsupported data type: " + seaTunnelFieldType); + } + } +} diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowSerializer.java index b0ba1e288de..d7cf508f853 100644 --- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowSerializer.java +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowSerializer.java @@ -17,161 +17,181 @@ package org.apache.seatunnel.connectors.seatunnel.iotdb.serialize; -import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.TimeseriesOption; -import static com.google.common.base.Preconditions.checkArgument; - +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import com.google.common.collect.Lists; -import org.apache.commons.lang3.StringUtils; -import org.apache.iotdb.tsfile.common.constant.TsFileConstant; +import com.google.common.base.Strings; +import lombok.NonNull; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer { - private static final String FIELD_DEVICE = "device"; - private static final String FIELD_TIMESTAMP = "timestamp"; - private static final String FIELD_MEASUREMENTS = "measurements"; - private static final String FIELD_TYPES = "types"; - private static final String FIELD_VALUES = "values"; - private static final String SEPARATOR = ","; - - private final SeaTunnelRowType seaTunnelRowType; - private final Map timeseriesOptionMap; - private final Function deviceExtractor; private final Function timestampExtractor; - private final Function> measurementsExtractor; - private final Function> typesExtractor; - - public DefaultSeaTunnelRowSerializer(SeaTunnelRowType seaTunnelRowType, - List timeseriesOptions) { - validateRowTypeSchema(seaTunnelRowType); - - this.seaTunnelRowType = seaTunnelRowType; - this.timeseriesOptionMap = Optional.ofNullable(timeseriesOptions) - .orElse(Collections.emptyList()).stream() - .collect(Collectors.toMap(option -> option.getPath(), option -> option)); - - final List rowTypeFields = Arrays.asList(seaTunnelRowType.getFieldNames()); - final int deviceIndex = seaTunnelRowType.indexOf(FIELD_DEVICE); - this.deviceExtractor = seaTunnelRow -> seaTunnelRow.getField(deviceIndex).toString(); - final int timestampIndex = seaTunnelRowType.indexOf(FIELD_TIMESTAMP); - this.timestampExtractor = rowTypeFields.contains(FIELD_TIMESTAMP) ? - seaTunnelRow -> Long.parseLong(seaTunnelRow.getField(timestampIndex).toString()) : - seaTunnelRow -> System.currentTimeMillis(); - final int measurementsIndex = seaTunnelRowType.indexOf(FIELD_MEASUREMENTS); - this.measurementsExtractor = seaTunnelRow -> - Arrays.asList(seaTunnelRow.getField(measurementsIndex).toString().split(SEPARATOR)); - final boolean containsTypesField = rowTypeFields.contains(FIELD_TYPES); - final int typesIndex = containsTypesField ? seaTunnelRowType.indexOf(FIELD_TYPES) : -1; - this.typesExtractor = seaTunnelRow -> { - if (!containsTypesField) { - return null; - } - return Arrays.stream(seaTunnelRow.getField(typesIndex).toString().split(SEPARATOR)) - .map(type -> TSDataType.valueOf(type)) - .collect(Collectors.toList()); - }; + private final Function deviceExtractor; + private final Function> valuesExtractor; + private final List measurements; + private final List measurementsType; + + public DefaultSeaTunnelRowSerializer(@NonNull SeaTunnelRowType seaTunnelRowType, + String storageGroup, + String timestampKey, + @NonNull String deviceKey, + List measurementKeys) { + this.timestampExtractor = createTimestampExtractor(seaTunnelRowType, timestampKey); + this.deviceExtractor = createDeviceExtractor(seaTunnelRowType, deviceKey, storageGroup); + this.measurements = createMeasurements(seaTunnelRowType, timestampKey, deviceKey, measurementKeys); + this.measurementsType = createMeasurementTypes(seaTunnelRowType, measurements); + this.valuesExtractor = createValuesExtractor(seaTunnelRowType, measurements, measurementsType); } @Override public IoTDBRecord serialize(SeaTunnelRow seaTunnelRow) { - String device = deviceExtractor.apply(seaTunnelRow); Long timestamp = timestampExtractor.apply(seaTunnelRow); - List measurements = measurementsExtractor.apply(seaTunnelRow); - List types = typesExtractor.apply(seaTunnelRow); - List values = extractValues(device, measurements, types, seaTunnelRow); - return new IoTDBRecord(device, timestamp, measurements, types, values); - } - - private void validateRowTypeSchema(SeaTunnelRowType seaTunnelRowType) throws IllegalArgumentException { - List rowTypeFields = Lists.newArrayList(seaTunnelRowType.getFieldNames()); - checkArgument(rowTypeFields.contains(FIELD_DEVICE)); - checkArgument(rowTypeFields.contains(FIELD_MEASUREMENTS)); - checkArgument(rowTypeFields.contains(FIELD_VALUES)); - - rowTypeFields.remove(FIELD_DEVICE); - rowTypeFields.remove(FIELD_TIMESTAMP); - rowTypeFields.remove(FIELD_MEASUREMENTS); - rowTypeFields.remove(FIELD_TYPES); - rowTypeFields.remove(FIELD_VALUES); - checkArgument(rowTypeFields.isEmpty(), - "Illegal SeaTunnelRowType fields: " + rowTypeFields); + String device = deviceExtractor.apply(seaTunnelRow); + List values = valuesExtractor.apply(seaTunnelRow); + return new IoTDBRecord(device, timestamp, measurements, measurementsType, values); } - private List extractValues(String device, - List measurements, - List tsDataTypes, - SeaTunnelRow seaTunnelRow) { - int valuesIndex = seaTunnelRowType.indexOf(FIELD_VALUES); - String[] valuesStr = StringUtils.trim( - seaTunnelRow.getField(valuesIndex).toString()).split(SEPARATOR); - if (tsDataTypes == null || tsDataTypes.isEmpty()) { - convertTextValues(device, measurements, valuesStr); - return Arrays.asList(valuesStr); + private Function createTimestampExtractor(SeaTunnelRowType seaTunnelRowType, + String timestampKey) { + if (Strings.isNullOrEmpty(timestampKey)) { + return row -> System.currentTimeMillis(); } - List values = new ArrayList<>(); - for (int i = 0; i < valuesStr.length; i++) { - TSDataType tsDataType = tsDataTypes.get(i); - switch (tsDataType) { - case INT32: - values.add(Integer.valueOf(valuesStr[i])); - break; - case INT64: - values.add(Long.valueOf(valuesStr[i])); - break; - case FLOAT: - values.add(Float.valueOf(valuesStr[i])); - break; - case DOUBLE: - values.add(Double.valueOf(valuesStr[i])); - break; - case BOOLEAN: - values.add(Boolean.valueOf(valuesStr[i])); - break; - case TEXT: - String value = valuesStr[i]; - if (!value.startsWith("\"") && !value.startsWith("'")) { - value = convertToTextValue(value); - } - values.add(value); - break; + int timestampFieldIndex = seaTunnelRowType.indexOf(timestampKey); + return row -> { + Object timestamp = row.getField(timestampFieldIndex); + if (timestamp == null) { + return System.currentTimeMillis(); + } + SeaTunnelDataType timestampFieldType = seaTunnelRowType.getFieldType(timestampFieldIndex); + switch (timestampFieldType.getSqlType()) { + case STRING: + return Long.parseLong((String) timestamp); + case TIMESTAMP: + return LocalDateTime.class.cast(timestamp) + .atZone(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(); + case BIGINT: + return (Long) timestamp; default: - throw new UnsupportedOperationException("Unsupported dataType: " + tsDataType); + throw new UnsupportedOperationException("Unsupported data type: " + timestampFieldType); + } + }; + } + + private Function createDeviceExtractor(SeaTunnelRowType seaTunnelRowType, + String deviceKey, + String storageGroup) { + int deviceIndex = seaTunnelRowType.indexOf(deviceKey); + return seaTunnelRow -> { + String device = seaTunnelRow.getField(deviceIndex).toString(); + if (Strings.isNullOrEmpty(storageGroup)) { + return device; + } + if (storageGroup.endsWith(".") || device.startsWith(".")) { + return storageGroup + device; } + return storageGroup + "." + device; + }; + } + + private List createMeasurements(SeaTunnelRowType seaTunnelRowType, + String timestampKey, + String deviceKey, + List measurementKeys) { + if (measurementKeys == null || measurementKeys.isEmpty()) { + return Stream.of(seaTunnelRowType.getFieldNames()) + .filter(name -> !name.equals(deviceKey)) + .filter(name -> !name.equals(timestampKey)) + .collect(Collectors.toList()); } - return values; + return measurementKeys; + } + + private List createMeasurementTypes(SeaTunnelRowType seaTunnelRowType, + List measurements) { + return measurements.stream() + .map(measurement -> { + int indexOfSeaTunnelRow = seaTunnelRowType.indexOf(measurement); + SeaTunnelDataType seaTunnelType = seaTunnelRowType.getFieldType(indexOfSeaTunnelRow); + return convert(seaTunnelType); + }) + .collect(Collectors.toList()); } - private void convertTextValues(String device, List measurements, String[] values) { - if (device != null - && measurements != null - && values != null - && !timeseriesOptionMap.isEmpty() - && measurements.size() == values.length) { + private Function> createValuesExtractor(SeaTunnelRowType seaTunnelRowType, + List measurements, + List measurementTypes) { + return row -> { + List measurementValues = new ArrayList<>(measurements.size()); for (int i = 0; i < measurements.size(); i++) { - String measurement = device + TsFileConstant.PATH_SEPARATOR + measurements.get(i); - TimeseriesOption timeseriesOption = timeseriesOptionMap.get(measurement); - if (timeseriesOption != null && TSDataType.TEXT.equals(timeseriesOption.getDataType())) { - // The TEXT data type should be covered by " or ' - values[i] = convertToTextValue(values[i]); - } + String measurement = measurements.get(i); + TSDataType measurementDataType = measurementsType.get(i); + + int indexOfSeaTunnelRow = seaTunnelRowType.indexOf(measurement); + SeaTunnelDataType seaTunnelDataType = seaTunnelRowType.getFieldType(indexOfSeaTunnelRow); + Object seaTunnelFieldValue = row.getField(indexOfSeaTunnelRow); + + Object measurementValue = convert(seaTunnelDataType, measurementDataType, seaTunnelFieldValue); + measurementValues.add(measurementValue); } + return measurementValues; + }; + } + + private static TSDataType convert(SeaTunnelDataType dataType) { + switch (dataType.getSqlType()) { + case STRING: + return TSDataType.TEXT; + case BOOLEAN: + return TSDataType.BOOLEAN; + case TINYINT: + case SMALLINT: + case INT: + return TSDataType.INT32; + case BIGINT: + return TSDataType.INT64; + case FLOAT: + return TSDataType.FLOAT; + case DOUBLE: + return TSDataType.DOUBLE; + default: + throw new UnsupportedOperationException("Unsupported dataType: " + dataType); } } - private String convertToTextValue(Object value) { - return "'" + value + "'"; + private static Object convert(SeaTunnelDataType seaTunnelType, + TSDataType tsDataType, + Object value) { + if (value == null) { + return null; + } + switch (tsDataType) { + case INT32: + return ((Number) value).intValue(); + case INT64: + return ((Number) value).longValue(); + case FLOAT: + return ((Number) value).floatValue(); + case DOUBLE: + return ((Number) value).doubleValue(); + case BOOLEAN: + return Boolean.valueOf((Boolean) value); + case TEXT: + return value.toString(); + default: + throw new UnsupportedOperationException("Unsupported dataType: " + tsDataType); + } } } diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/SeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/SeaTunnelRowDeserializer.java new file mode 100644 index 00000000000..5c00ba63a5e --- /dev/null +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/SeaTunnelRowDeserializer.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.iotdb.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.apache.iotdb.tsfile.read.common.RowRecord; + +public interface SeaTunnelRowDeserializer { + + SeaTunnelRow deserialize(RowRecord rowRecord); +} diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkClient.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkClient.java index c56204c38ad..a9b095496b2 100644 --- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkClient.java +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkClient.java @@ -131,7 +131,7 @@ public synchronized void close() throws IOException { } } - private synchronized void flush() throws IOException { + synchronized void flush() throws IOException { checkFlushException(); if (batchList.isEmpty()) { return; diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkWriter.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkWriter.java index fc37c5434fc..70dbd93dc1f 100644 --- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkWriter.java +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkWriter.java @@ -27,9 +27,11 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.util.Optional; @Slf4j public class IoTDBSinkWriter extends AbstractSinkWriter { @@ -41,7 +43,8 @@ public IoTDBSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) { SinkConfig sinkConfig = SinkConfig.loadConfig(pluginConfig); this.serializer = new DefaultSeaTunnelRowSerializer( - seaTunnelRowType, sinkConfig.getTimeseriesOptions()); + seaTunnelRowType, sinkConfig.getStorageGroup(), sinkConfig.getKeyTimestamp(), + sinkConfig.getKeyDevice(), sinkConfig.getKeyMeasurementFields()); this.sinkClient = new IoTDBSinkClient(sinkConfig); } @@ -51,6 +54,14 @@ public void write(SeaTunnelRow element) throws IOException { sinkClient.write(record); } + @SneakyThrows + @Override + public Optional prepareCommit() { + // Flush to storage before snapshot state is performed + sinkClient.flush(); + return super.prepareCommit(); + } + @Override public void close() throws IOException { sinkClient.close(); diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java index 668d2861d66..592c0739ede 100644 --- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java @@ -94,7 +94,7 @@ public SourceSplitEnumerator createEnumerato @Override public SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context enumeratorContext, IoTDBSourceState checkpointState) throws Exception { - return new IoTDBSourceSplitEnumerator(enumeratorContext, checkpointState, configParams); + return new IoTDBSourceSplitEnumerator(enumeratorContext, configParams, checkpointState); } } diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java index 2fcac5ff48f..a7e8aaa8a46 100644 --- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java @@ -34,44 +34,47 @@ import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.iotdb.serialize.DefaultSeaTunnelRowDeserializer; +import org.apache.seatunnel.connectors.seatunnel.iotdb.serialize.SeaTunnelRowDeserializer; import lombok.extern.slf4j.Slf4j; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.session.Session; import org.apache.iotdb.session.SessionDataSet; import org.apache.iotdb.session.util.Version; -import org.apache.iotdb.tsfile.read.common.Field; import org.apache.iotdb.tsfile.read.common.RowRecord; import java.io.IOException; import java.util.ArrayList; -import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.Queue; import java.util.stream.Collectors; import java.util.stream.Stream; @Slf4j public class IoTDBSourceReader implements SourceReader { - private static final long THREAD_WAIT_TIME = 500L; + private final Map conf; - private Map conf; - - private Set sourceSplits; + private final Queue pendingSplits; private final SourceReader.Context context; - private SeaTunnelRowType seaTunnelRowType; + private final SeaTunnelRowDeserializer deserializer; private Session session; - public IoTDBSourceReader(Map conf, SourceReader.Context readerContext, SeaTunnelRowType seaTunnelRowType) { + private volatile boolean noMoreSplitsAssignment; + + public IoTDBSourceReader(Map conf, + SourceReader.Context readerContext, + SeaTunnelRowType rowType) { this.conf = conf; - this.sourceSplits = new HashSet<>(); + this.pendingSplits = new LinkedList<>(); this.context = readerContext; - this.seaTunnelRowType = seaTunnelRowType; + this.deserializer = new DefaultSeaTunnelRowDeserializer(rowType); } @Override @@ -92,21 +95,18 @@ public void close() throws IOException { @Override public void pollNext(Collector output) throws Exception { - if (sourceSplits.isEmpty()) { - Thread.sleep(THREAD_WAIT_TIME); - return; - } - sourceSplits.forEach(source -> { - try { - read(source, output); - } catch (Exception e) { - throw new RuntimeException("IotDB source read error", e); + while (!pendingSplits.isEmpty()) { + synchronized (output.getCheckpointLock()) { + IoTDBSourceSplit split = pendingSplits.poll(); + read(split, output); } - }); + } - if (Boundedness.BOUNDED.equals(context.getBoundedness())) { + if (Boundedness.BOUNDED.equals(context.getBoundedness()) + && noMoreSplitsAssignment + && pendingSplits.isEmpty()) { // signal to the source that we have reached the end of the data. - log.info("Closed the bounded fake source"); + log.info("Closed the bounded iotdb source"); context.signalNoMoreElement(); } } @@ -114,37 +114,13 @@ public void pollNext(Collector output) throws Exception { private void read(IoTDBSourceSplit split, Collector output) throws Exception { try (SessionDataSet dataSet = session.executeQueryStatement(split.getQuery())) { while (dataSet.hasNext()) { - RowRecord row = dataSet.next(); - Object[] datas = new Object[row.getFields().size()]; - for (int i = 0; i < row.getFields().size(); i++) { - row.getFields().get(i).getDataType(); - datas[i] = convertToDataType(row.getFields().get(i)); - } - output.collect(new SeaTunnelRow(datas)); + RowRecord rowRecord = dataSet.next(); + SeaTunnelRow seaTunnelRow = deserializer.deserialize(rowRecord); + output.collect(seaTunnelRow); } } } - private Object convertToDataType(Field field) { - - switch (field.getDataType()) { - case INT32: - return field.getIntV(); - case INT64: - return field.getLongV(); - case FLOAT: - return field.getFloatV(); - case DOUBLE: - return field.getDoubleV(); - case TEXT: - return field.getStringValue(); - case BOOLEAN: - return field.getBoolV(); - default: - throw new IllegalArgumentException("unknown TSData type: " + field.getDataType()); - } - } - private Session buildSession(Map conf) { Session.Builder sessionBuilder = new Session.Builder(); if (conf.containsKey(HOST)) { @@ -185,17 +161,18 @@ private Session buildSession(Map conf) { @Override public List snapshotState(long checkpointId) { - return new ArrayList<>(sourceSplits); + return new ArrayList<>(pendingSplits); } @Override public void addSplits(List splits) { - sourceSplits.addAll(splits); + pendingSplits.addAll(splits); } @Override public void handleNoMoreSplits() { - // do nothing + log.info("Reader received NoMoreSplits event."); + noMoreSplitsAssignment = true; } @Override diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplit.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplit.java index 51331805405..8b38f3abde7 100644 --- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplit.java +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplit.java @@ -19,6 +19,9 @@ import org.apache.seatunnel.api.source.SourceSplit; +import lombok.ToString; + +@ToString public class IoTDBSourceSplit implements SourceSplit { private static final long serialVersionUID = -1L; diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java index 01852a48836..5d7a83c5985 100644 --- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java @@ -22,13 +22,16 @@ import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.SQL; import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.UPPER_BOUND; import static org.apache.seatunnel.connectors.seatunnel.iotdb.constant.SourceConstants.DEFAULT_PARTITIONS; +import static org.apache.seatunnel.connectors.seatunnel.iotdb.constant.SourceConstants.SQL_ALIGN; import static org.apache.seatunnel.connectors.seatunnel.iotdb.constant.SourceConstants.SQL_WHERE; import static org.apache.iotdb.tsfile.common.constant.QueryConstant.RESERVED_TIME; import org.apache.seatunnel.api.source.SourceSplitEnumerator; -import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.connectors.seatunnel.iotdb.state.IoTDBSourceState; +import com.google.common.base.Strings; +import lombok.extern.slf4j.Slf4j; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -38,13 +41,9 @@ import java.util.Map; import java.util.Set; +@Slf4j public class IoTDBSourceSplitEnumerator implements SourceSplitEnumerator { - private final Context context; - private Set pendingSplit; - private Set assignedSplit; - private Map conf; - /** * A SQL statement can contain at most one where * We split the SQL using the where keyword @@ -52,26 +51,51 @@ public class IoTDBSourceSplitEnumerator implements SourceSplitEnumerator context, Map conf) { - this.context = context; - this.conf = conf; + private final Object stateLock = new Object(); + private final Context context; + private final Map conf; + private final Map> pendingSplit; + private volatile boolean shouldEnumerate; + + public IoTDBSourceSplitEnumerator(SourceSplitEnumerator.Context context, + Map conf) { + this(context, conf, null); } - public IoTDBSourceSplitEnumerator(SourceSplitEnumerator.Context context, IoTDBSourceState sourceState, Map conf) { - this(context, conf); - this.assignedSplit = sourceState.getAssignedSplit(); + public IoTDBSourceSplitEnumerator(SourceSplitEnumerator.Context context, + Map conf, + IoTDBSourceState sourceState) { + this.context = context; + this.conf = conf; + this.pendingSplit = new HashMap<>(); + this.shouldEnumerate = sourceState == null; + if (sourceState != null) { + this.shouldEnumerate = sourceState.isShouldEnumerate(); + this.pendingSplit.putAll(sourceState.getPendingSplit()); + } } @Override public void open() { - this.assignedSplit = new HashSet<>(); - this.pendingSplit = new HashSet<>(); } @Override public void run() { - pendingSplit = getIotDBSplit(); - assignSplit(context.registeredReaders()); + Set readers = context.registeredReaders(); + if (shouldEnumerate) { + Set newSplits = getIotDBSplit(); + + synchronized (stateLock) { + addPendingSplit(newSplits); + shouldEnumerate = false; + } + + assignSplit(readers); + } + + log.debug("No more splits to assign." + + " Sending NoMoreSplitsEvent to reader {}.", readers); + readers.forEach(context::signalNoMoreSplits); } /** @@ -99,12 +123,24 @@ private Set getIotDBSplit() { long start = Long.parseLong(conf.get(LOWER_BOUND).toString()); long end = Long.parseLong(conf.get(UPPER_BOUND).toString()); int numPartitions = Integer.parseInt(conf.get(NUM_PARTITIONS).toString()); - String[] sqls = sql.split(SQL_WHERE); + String sqlBase = sql; + String sqlAlign = null; + String sqlCondition = null; + String[] sqls = sqlBase.split("(?i)" + SQL_ALIGN); + if (sqls.length > 1) { + sqlBase = sqls[0]; + sqlAlign = sqls[1]; + } + sqls = sqlBase.split("(?i)" + SQL_WHERE); if (sqls.length > SQL_WHERE_SPLIT_LENGTH) { throw new IllegalArgumentException("sql should not contain more than one where"); } - int size = (int) (end - start) / numPartitions + 1; - int remainder = (int) ((end + 1 - start) % numPartitions); + if (sqls.length > 1) { + sqlBase = sqls[0]; + sqlCondition = sqls[1]; + } + long size = (end - start) / numPartitions + 1; + long remainder = (end + 1 - start) % numPartitions; if (end - start < numPartitions) { numPartitions = (int) (end - start); } @@ -117,19 +153,24 @@ private Set getIotDBSplit() { if (i + 1 <= numPartitions) { currentStart = currentStart - remainder; } - query = sqls[0] + query; - if (sqls.length > 1) { - query = query + " and ( " + sqls[1] + " ) "; + query = sqlBase + query; + if (!Strings.isNullOrEmpty(sqlCondition)) { + query = query + " and ( " + sqlCondition + " ) "; + } + if (!Strings.isNullOrEmpty(sqlAlign)) { + query = query + " align by " + sqlAlign; } - iotDBSourceSplits.add(new IoTDBSourceSplit(String.valueOf(i + System.nanoTime()), query)); + iotDBSourceSplits.add(new IoTDBSourceSplit(String.valueOf(query.hashCode()), query)); } return iotDBSourceSplits; } @Override public void addSplitsBack(List splits, int subtaskId) { + log.debug("Add back splits {} to IoTDBSourceSplitEnumerator.", + splits); if (!splits.isEmpty()) { - pendingSplit.addAll(splits); + addPendingSplit(splits); assignSplit(Collections.singletonList(subtaskId)); } } @@ -141,30 +182,51 @@ public int currentUnassignedSplitSize() { @Override public void registerReader(int subtaskId) { + log.debug("Register reader {} to IoTDBSourceSplitEnumerator.", + subtaskId); if (!pendingSplit.isEmpty()) { assignSplit(Collections.singletonList(subtaskId)); } } - private void assignSplit(Collection taskIDList) { - Map> readySplit = new HashMap<>(Common.COLLECTION_SIZE); - for (int taskID : taskIDList) { - readySplit.computeIfAbsent(taskID, id -> new ArrayList<>()); + private void addPendingSplit(Collection splits) { + int readerCount = context.currentParallelism(); + for (IoTDBSourceSplit split : splits) { + int ownerReader = getSplitOwner(split.splitId(), readerCount); + log.info("Assigning {} to {} reader.", split, ownerReader); + pendingSplit.computeIfAbsent(ownerReader, r -> new ArrayList<>()) + .add(split); + } + } + + private void assignSplit(Collection readers) { + log.debug("Assign pendingSplits to readers {}", readers); + + for (int reader : readers) { + List assignmentForReader = pendingSplit.remove(reader); + if (assignmentForReader != null && !assignmentForReader.isEmpty()) { + log.info("Assign splits {} to reader {}", + assignmentForReader, reader); + try { + context.assignSplit(reader, assignmentForReader); + } catch (Exception e) { + log.error("Failed to assign splits {} to reader {}", + assignmentForReader, reader, e); + pendingSplit.put(reader, assignmentForReader); + } + } } - pendingSplit.forEach(s -> readySplit.get(getSplitOwner(s.splitId(), taskIDList.size())) - .add(s)); - readySplit.forEach(context::assignSplit); - assignedSplit.addAll(pendingSplit); - pendingSplit.clear(); } @Override public IoTDBSourceState snapshotState(long checkpointId) throws Exception { - return new IoTDBSourceState(assignedSplit); + synchronized (stateLock) { + return new IoTDBSourceState(shouldEnumerate, pendingSplit); + } } private static int getSplitOwner(String tp, int numReaders) { - return tp.hashCode() % numReaders; + return (tp.hashCode() & Integer.MAX_VALUE) % numReaders; } @Override @@ -179,6 +241,6 @@ public void close() { @Override public void handleSplitRequest(int subtaskId) { - //nothing to do + throw new UnsupportedOperationException("Unsupported handleSplitRequest: " + subtaskId); } } diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/state/IoTDBSourceState.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/state/IoTDBSourceState.java index 924cecd24cb..b6588c7dc78 100644 --- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/state/IoTDBSourceState.java +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/state/IoTDBSourceState.java @@ -19,18 +19,17 @@ import org.apache.seatunnel.connectors.seatunnel.iotdb.source.IoTDBSourceSplit; +import lombok.AllArgsConstructor; +import lombok.Getter; + import java.io.Serializable; -import java.util.Set; +import java.util.List; +import java.util.Map; +@AllArgsConstructor +@Getter public class IoTDBSourceState implements Serializable { - private Set assignedSplit; - - public IoTDBSourceState(Set assignedSplit) { - this.assignedSplit = assignedSplit; - } - - public Set getAssignedSplit() { - return assignedSplit; - } + private boolean shouldEnumerate; + private Map> pendingSplit; } diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/FakeSourceToIoTDBIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/FakeSourceToIoTDBIT.java deleted file mode 100644 index 704d23228ac..00000000000 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/FakeSourceToIoTDBIT.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.e2e.flink.v2.iotdb; - -import static org.awaitility.Awaitility.given; - -import org.apache.seatunnel.e2e.flink.FlinkContainer; - -import com.google.common.collect.Lists; -import lombok.extern.slf4j.Slf4j; -import org.apache.iotdb.session.Session; -import org.apache.iotdb.session.SessionDataSet; -import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.read.common.Field; -import org.apache.iotdb.tsfile.read.common.RowRecord; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - -@Slf4j -public class FakeSourceToIoTDBIT extends FlinkContainer { - - private static final String IOTDB_DOCKER_IMAGE = "apache/iotdb:0.13.1-node"; - private static final String IOTDB_HOST = "flink_e2e_iotdb_sink"; - private static final int IOTDB_PORT = 6667; - private static final String IOTDB_USERNAME = "root"; - private static final String IOTDB_PASSWORD = "root"; - - private GenericContainer iotdbServer; - private Session session; - - @BeforeEach - public void startIoTDBContainer() throws Exception { - iotdbServer = new GenericContainer<>(IOTDB_DOCKER_IMAGE) - .withNetwork(NETWORK) - .withNetworkAliases(IOTDB_HOST) - .withLogConsumer(new Slf4jLogConsumer(log)); - iotdbServer.setPortBindings(Lists.newArrayList( - String.format("%s:6667", IOTDB_PORT))); - Startables.deepStart(Stream.of(iotdbServer)).join(); - log.info("IoTDB container started"); - // wait for IoTDB fully start - session = createSession(); - given().ignoreExceptions() - .await() - .atLeast(100, TimeUnit.MILLISECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(30, TimeUnit.SECONDS) - .untilAsserted(() -> session.open()); - initIoTDBTimeseries(); - } - - /** - * fake source -> IoTDB sink - */ - @Test - public void testFakeSourceToIoTDB() throws Exception { - Container.ExecResult execResult = executeSeaTunnelFlinkJob("/iotdb/fakesource_to_iotdb.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - - // query result - SessionDataSet dataSet = session.executeQueryStatement("select status, value from root.ln.d1"); - List actual = new ArrayList<>(); - while (dataSet.hasNext()) { - RowRecord row = dataSet.next(); - List fields = row.getFields(); - Field status = fields.get(0); - Field val = fields.get(1); - actual.add(Arrays.asList(status.getBoolV(), val.getLongV())); - } - List expected = Arrays.asList( - Arrays.asList(Boolean.TRUE, Long.valueOf(1001)), - Arrays.asList(Boolean.FALSE, Long.valueOf(1002))); - Assertions.assertIterableEquals(expected, actual); - } - - private Session createSession() { - return new Session.Builder() - .host("localhost") - .port(IOTDB_PORT) - .username(IOTDB_USERNAME) - .password(IOTDB_PASSWORD) - .build(); - } - - private void initIoTDBTimeseries() throws Exception { - session.setStorageGroup("root.ln"); - session.createTimeseries("root.ln.d1.status", - TSDataType.BOOLEAN, TSEncoding.PLAIN, CompressionType.SNAPPY); - session.createTimeseries("root.ln.d1.value", - TSDataType.INT64, TSEncoding.PLAIN, CompressionType.SNAPPY); - } - - @AfterEach - public void closeIoTDBContainer() { - if (iotdbServer != null) { - iotdbServer.stop(); - } - } -} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/IoTDBIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/IoTDBIT.java new file mode 100644 index 00000000000..29dde429f1b --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/IoTDBIT.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.flink.v2.iotdb; + +import static org.awaitility.Awaitility.given; + +import org.apache.seatunnel.e2e.flink.FlinkContainer; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.session.SessionDataSet; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.utils.Binary; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Slf4j +public class IoTDBIT extends FlinkContainer { + + private static final String IOTDB_DOCKER_IMAGE = "apache/iotdb:0.13.1-node"; + private static final String IOTDB_HOST = "flink_e2e_iotdb_sink"; + private static final int IOTDB_PORT = 6667; + private static final String IOTDB_USERNAME = "root"; + private static final String IOTDB_PASSWORD = "root"; + private static final String SOURCE_GROUP = "root.source_group"; + private static final String SINK_GROUP = "root.sink_group"; + + private GenericContainer iotdbServer; + private Session session; + private List testDataset; + + @BeforeEach + public void startIoTDBContainer() throws Exception { + iotdbServer = new GenericContainer<>(IOTDB_DOCKER_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(IOTDB_HOST) + .withLogConsumer(new Slf4jLogConsumer(log)); + iotdbServer.setPortBindings(Lists.newArrayList( + String.format("%s:6667", IOTDB_PORT))); + Startables.deepStart(Stream.of(iotdbServer)).join(); + log.info("IoTDB container started"); + // wait for IoTDB fully start + session = createSession(); + given().ignoreExceptions() + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> session.open()); + testDataset = generateTestDataSet(); + } + + @Test + public void testIoTDB() throws Exception { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/iotdb/iotdb_source_to_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + List sinkDataset = readSinkDataset(); + assertDatasetEquals(testDataset, sinkDataset); + } + + private Session createSession() { + return new Session.Builder() + .host("localhost") + .port(IOTDB_PORT) + .username(IOTDB_USERNAME) + .password(IOTDB_PASSWORD) + .build(); + } + + private List generateTestDataSet() throws IoTDBConnectionException, StatementExecutionException { + session.setStorageGroup(SOURCE_GROUP); + session.setStorageGroup(SINK_GROUP); + + String[] deviceIds = new String[] { + "device_a", + "device_b" + }; + LinkedHashMap measurements = new LinkedHashMap<>(); + measurements.put("c_string", TSDataType.TEXT); + measurements.put("c_boolean", TSDataType.BOOLEAN); + measurements.put("c_tinyint", TSDataType.INT32); + measurements.put("c_smallint", TSDataType.INT32); + measurements.put("c_int", TSDataType.INT32); + measurements.put("c_bigint", TSDataType.INT64); + measurements.put("c_float", TSDataType.FLOAT); + measurements.put("c_double", TSDataType.DOUBLE); + + List rowRecords = new ArrayList<>(); + for (String deviceId : deviceIds) { + String devicePath = String.format("%s.%s", SOURCE_GROUP, deviceId); + ArrayList measurementKeys = new ArrayList<>(measurements.keySet()); + for (String measurement : measurements.keySet()) { + session.createTimeseries(String.format("%s.%s", devicePath, measurement), + measurements.get(measurement), TSEncoding.PLAIN, CompressionType.SNAPPY); + session.createTimeseries(String.format("%s.%s.%s", SINK_GROUP, deviceId, measurement), + measurements.get(measurement), TSEncoding.PLAIN, CompressionType.SNAPPY); + } + + for (int rowCount = 0; rowCount < 100; rowCount++) { + long timestamp = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(rowCount); + RowRecord record = new RowRecord(timestamp); + record.addField(new Binary(deviceId), TSDataType.TEXT); + record.addField(Boolean.FALSE, TSDataType.BOOLEAN); + record.addField(Byte.valueOf(Byte.MAX_VALUE).intValue(), TSDataType.INT32); + record.addField(Short.valueOf(Short.MAX_VALUE).intValue(), TSDataType.INT32); + record.addField(Integer.valueOf(rowCount), TSDataType.INT32); + record.addField(Long.MAX_VALUE, TSDataType.INT64); + record.addField(Float.MAX_VALUE, TSDataType.FLOAT); + record.addField(Double.MAX_VALUE, TSDataType.DOUBLE); + rowRecords.add(record); + log.info("TestDataSet row: {}", record); + + session.insertRecord(devicePath, + record.getTimestamp(), + measurementKeys, + record.getFields().stream().map(f -> f.getDataType()).collect(Collectors.toList()), + record.getFields().stream().map(f -> f.getObjectValue(f.getDataType())).collect(Collectors.toList())); + + } + } + return rowRecords; + } + + private List readSinkDataset() throws IoTDBConnectionException, StatementExecutionException { + SessionDataSet dataSet = session.executeQueryStatement("SELECT c_string, c_boolean, c_tinyint, c_smallint, c_int, c_bigint, c_float, c_double FROM " + SINK_GROUP + ".* align by device"); + List results = new ArrayList<>(); + while (dataSet.hasNext()) { + RowRecord record = dataSet.next(); + List notContainDeviceField = record.getFields().stream() + .filter(field -> !field.getStringValue().startsWith(SINK_GROUP)) + .collect(Collectors.toList()); + record = new RowRecord(record.getTimestamp(), notContainDeviceField); + results.add(record); + log.info("SinkDataset row: {}", record); + } + return results; + } + + private void assertDatasetEquals(List testDataset, List sinkDataset) { + Assertions.assertEquals(testDataset.size(), sinkDataset.size()); + + Collections.sort(testDataset, Comparator.comparingLong(RowRecord::getTimestamp)); + Collections.sort(sinkDataset, Comparator.comparingLong(RowRecord::getTimestamp)); + for (int rowIndex = 0; rowIndex < testDataset.size(); rowIndex++) { + RowRecord testDatasetRow = testDataset.get(rowIndex); + RowRecord sinkDatasetRow = sinkDataset.get(rowIndex); + Assertions.assertEquals(testDatasetRow.getTimestamp(), sinkDatasetRow.getTimestamp()); + + List testDatasetRowFields = testDatasetRow.getFields(); + List sinkDatasetRowFields = sinkDatasetRow.getFields(); + Assertions.assertEquals(testDatasetRowFields.size(), sinkDatasetRowFields.size()); + for (int fieldIndex = 0; fieldIndex < testDatasetRowFields.size(); fieldIndex++) { + Field testDatasetRowField = testDatasetRowFields.get(fieldIndex); + Field sinkDatasetRowField = sinkDatasetRowFields.get(fieldIndex); + Assertions.assertEquals( + testDatasetRowField.getObjectValue(testDatasetRowField.getDataType()), + sinkDatasetRowField.getObjectValue(sinkDatasetRowField.getDataType())); + } + } + } + + @AfterEach + public void closeIoTDBContainer() throws IoTDBConnectionException { + if (session != null) { + session.close(); + } + if (iotdbServer != null) { + iotdbServer.stop(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf similarity index 50% rename from seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf rename to seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf index 4c5e0fe4e96..61fa0357d3d 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf @@ -20,34 +20,43 @@ env { # You can set flink configuration here - execution.parallelism = 1 + execution.parallelism = 2 job.mode = "BATCH" #execution.checkpoint.interval = 10000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } source { - FakeSource { + IoTDB { result_table_name = "fake" - schema = { - fields { - name = "string" - age = "int" - } + + node_urls = "flink_e2e_iotdb_sink:6667" + username = "root" + password = "root" + sql = "SELECT c_string, c_boolean, c_tinyint, c_smallint, c_int, c_bigint, c_float, c_double FROM root.source_group.* WHERE time < 4102329600000 align by device" + lower_bound = 1 + upper_bound = 4102329600000 + num_partitions = 10 + fields { + ts = timestamp + device_name = string + + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double } } - # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } - transform { sql { - sql = "select * from (values('root.ln.d1', '1660147200000', 'status,value', 'true,1001'), ('root.ln.d1', '1660233600000', 'status,value', 'false,1002')) t (device, `timestamp`, measurements, `values`)" + sql = "SELECT 'root.sink_group.device_a' AS device_name, ts, c_string, c_boolean, c_tinyint, c_smallint, c_int, c_bigint, c_float, c_double FROM fake WHERE device_name = 'root.source_group.device_a' UNION SELECT 'root.sink_group.device_b' AS device_name, ts, c_string, c_boolean, c_tinyint, c_smallint, c_int, c_bigint, c_float, c_double FROM fake WHERE device_name = 'root.source_group.device_b'" } - - # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/transform/sql } sink { @@ -55,10 +64,10 @@ sink { node_urls = ["flink_e2e_iotdb_sink:6667"] username = "root" password = "root" + key_device = "device_name" + key_timestamp = "ts" + key_measurement_fields = ["c_string", "c_boolean", "c_tinyint", "c_smallint", "c_int", "c_bigint", "c_float", "c_double"] batch_size = 1 batch_interval_ms = 10 } - - # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/sink/IoTDB } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/FakeSourceToIoTDBIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/FakeSourceToIoTDBIT.java deleted file mode 100644 index 40a9a8ce6b1..00000000000 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/FakeSourceToIoTDBIT.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.e2e.spark.v2.iotdb; - -import static org.awaitility.Awaitility.given; - -import org.apache.seatunnel.e2e.spark.SparkContainer; - -import com.google.common.collect.Lists; -import lombok.extern.slf4j.Slf4j; -import org.apache.iotdb.session.Session; -import org.apache.iotdb.session.SessionDataSet; -import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.read.common.Field; -import org.apache.iotdb.tsfile.read.common.RowRecord; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - -@Slf4j -public class FakeSourceToIoTDBIT extends SparkContainer { - - private static final String IOTDB_DOCKER_IMAGE = "apache/iotdb:0.13.1-node"; - private static final String IOTDB_HOST = "spark_e2e_iotdb_sink"; - private static final int IOTDB_PORT = 6668; - private static final String IOTDB_USERNAME = "root"; - private static final String IOTDB_PASSWORD = "root"; - - private GenericContainer iotdbServer; - private Session session; - - @BeforeEach - public void startIoTDBContainer() throws Exception { - iotdbServer = new GenericContainer<>(IOTDB_DOCKER_IMAGE) - .withNetwork(NETWORK) - .withNetworkAliases(IOTDB_HOST) - .withLogConsumer(new Slf4jLogConsumer(log)); - iotdbServer.setPortBindings(Lists.newArrayList( - String.format("%s:6667", IOTDB_PORT))); - Startables.deepStart(Stream.of(iotdbServer)).join(); - log.info("IoTDB container started"); - // wait for IoTDB fully start - session = createSession(); - given().ignoreExceptions() - .await() - .atLeast(100, TimeUnit.MILLISECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(30, TimeUnit.SECONDS) - .untilAsserted(() -> session.open()); - initIoTDBTimeseries(); - } - - /** - * fake source -> IoTDB sink - */ - //@Test - public void testFakeSourceToIoTDB() throws Exception { - Container.ExecResult execResult = executeSeaTunnelSparkJob("/iotdb/fakesource_to_iotdb.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - - // query result - SessionDataSet dataSet = session.executeQueryStatement("select status, value from root.ln.d1"); - List actual = new ArrayList<>(); - while (dataSet.hasNext()) { - RowRecord row = dataSet.next(); - List fields = row.getFields(); - Field status = fields.get(0); - Field val = fields.get(1); - actual.add(Arrays.asList(status.getBoolV(), val.getLongV())); - } - List expected = Arrays.asList( - Arrays.asList(Boolean.TRUE, Long.valueOf(1001)), - Arrays.asList(Boolean.FALSE, Long.valueOf(1002))); - Assertions.assertIterableEquals(expected, actual); - } - - private Session createSession() { - return new Session.Builder() - .host("localhost") - .port(IOTDB_PORT) - .username(IOTDB_USERNAME) - .password(IOTDB_PASSWORD) - .build(); - } - - private void initIoTDBTimeseries() throws Exception { - session.setStorageGroup("root.ln"); - session.createTimeseries("root.ln.d1.status", - TSDataType.BOOLEAN, TSEncoding.PLAIN, CompressionType.SNAPPY); - session.createTimeseries("root.ln.d1.value", - TSDataType.INT64, TSEncoding.PLAIN, CompressionType.SNAPPY); - } - - @AfterEach - public void closeIoTDBContainer() { - if (iotdbServer != null) { - iotdbServer.stop(); - } - } -} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/IoTDBIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/IoTDBIT.java new file mode 100644 index 00000000000..5082559e444 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/IoTDBIT.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.spark.v2.iotdb; + +import static org.awaitility.Awaitility.given; + +import org.apache.seatunnel.e2e.spark.SparkContainer; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.session.SessionDataSet; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.utils.Binary; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Slf4j +public class IoTDBIT extends SparkContainer { + + private static final String IOTDB_DOCKER_IMAGE = "apache/iotdb:0.13.1-node"; + private static final String IOTDB_HOST = "spark_e2e_iotdb_sink"; + private static final int IOTDB_PORT = 6668; + private static final String IOTDB_USERNAME = "root"; + private static final String IOTDB_PASSWORD = "root"; + private static final String SOURCE_GROUP = "root.source_group"; + private static final String SINK_GROUP = "root.sink_group"; + + private GenericContainer iotdbServer; + private Session session; + private List testDataset; + + @BeforeEach + public void startIoTDBContainer() throws Exception { + iotdbServer = new GenericContainer<>(IOTDB_DOCKER_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(IOTDB_HOST) + .withLogConsumer(new Slf4jLogConsumer(log)); + iotdbServer.setPortBindings(Lists.newArrayList( + String.format("%s:6667", IOTDB_PORT))); + Startables.deepStart(Stream.of(iotdbServer)).join(); + log.info("IoTDB container started"); + // wait for IoTDB fully start + session = createSession(); + given().ignoreExceptions() + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> session.open()); + testDataset = generateTestDataSet(); + } + + //@Test + public void testFakeSourceToIoTDB() throws Exception { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/iotdb/iotdb_source_to_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + List sinkDataset = readSinkDataset(); + assertDatasetEquals(testDataset, sinkDataset); + } + + private Session createSession() { + return new Session.Builder() + .host("localhost") + .port(IOTDB_PORT) + .username(IOTDB_USERNAME) + .password(IOTDB_PASSWORD) + .build(); + } + + private List generateTestDataSet() throws IoTDBConnectionException, StatementExecutionException { + session.setStorageGroup(SOURCE_GROUP); + session.setStorageGroup(SINK_GROUP); + + String[] deviceIds = new String[] { + "device_a", + "device_b" + }; + LinkedHashMap measurements = new LinkedHashMap<>(); + measurements.put("c_string", TSDataType.TEXT); + measurements.put("c_boolean", TSDataType.BOOLEAN); + measurements.put("c_tinyint", TSDataType.INT32); + measurements.put("c_smallint", TSDataType.INT32); + measurements.put("c_int", TSDataType.INT32); + measurements.put("c_bigint", TSDataType.INT64); + measurements.put("c_float", TSDataType.FLOAT); + measurements.put("c_double", TSDataType.DOUBLE); + + List rowRecords = new ArrayList<>(); + for (String deviceId : deviceIds) { + String devicePath = String.format("%s.%s", SOURCE_GROUP, deviceId); + ArrayList measurementKeys = new ArrayList<>(measurements.keySet()); + for (String measurement : measurements.keySet()) { + session.createTimeseries(String.format("%s.%s", devicePath, measurement), + measurements.get(measurement), TSEncoding.PLAIN, CompressionType.SNAPPY); + session.createTimeseries(String.format("%s.%s.%s", SINK_GROUP, deviceId, measurement), + measurements.get(measurement), TSEncoding.PLAIN, CompressionType.SNAPPY); + } + + for (int rowCount = 0; rowCount < 100; rowCount++) { + long timestamp = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(rowCount); + RowRecord record = new RowRecord(timestamp); + record.addField(new Binary(deviceId), TSDataType.TEXT); + record.addField(Boolean.FALSE, TSDataType.BOOLEAN); + record.addField(Byte.valueOf(Byte.MAX_VALUE).intValue(), TSDataType.INT32); + record.addField(Short.valueOf(Short.MAX_VALUE).intValue(), TSDataType.INT32); + record.addField(Integer.valueOf(rowCount), TSDataType.INT32); + record.addField(Long.MAX_VALUE, TSDataType.INT64); + record.addField(Float.MAX_VALUE, TSDataType.FLOAT); + record.addField(Double.MAX_VALUE, TSDataType.DOUBLE); + rowRecords.add(record); + log.info("TestDataSet row: {}", record); + + session.insertRecord(devicePath, + record.getTimestamp(), + measurementKeys, + record.getFields().stream().map(f -> f.getDataType()).collect(Collectors.toList()), + record.getFields().stream().map(f -> f.getObjectValue(f.getDataType())).collect(Collectors.toList())); + + } + } + return rowRecords; + } + + private List readSinkDataset() throws IoTDBConnectionException, StatementExecutionException { + SessionDataSet dataSet = session.executeQueryStatement("SELECT c_string, c_boolean, c_tinyint, c_smallint, c_int, c_bigint, c_float, c_double FROM " + SINK_GROUP + ".* align by device"); + List results = new ArrayList<>(); + while (dataSet.hasNext()) { + RowRecord record = dataSet.next(); + List notContainDeviceField = record.getFields().stream() + .filter(field -> !field.getStringValue().startsWith(SINK_GROUP)) + .collect(Collectors.toList()); + record = new RowRecord(record.getTimestamp(), notContainDeviceField); + results.add(record); + log.info("SinkDataset row: {}", record); + } + return results; + } + + private void assertDatasetEquals(List testDataset, List sinkDataset) { + Assertions.assertEquals(testDataset.size(), sinkDataset.size()); + + Collections.sort(testDataset, Comparator.comparingLong(RowRecord::getTimestamp)); + Collections.sort(sinkDataset, Comparator.comparingLong(RowRecord::getTimestamp)); + for (int rowIndex = 0; rowIndex < testDataset.size(); rowIndex++) { + RowRecord testDatasetRow = testDataset.get(rowIndex); + RowRecord sinkDatasetRow = sinkDataset.get(rowIndex); + Assertions.assertEquals(testDatasetRow.getTimestamp(), sinkDatasetRow.getTimestamp()); + + List testDatasetRowFields = testDatasetRow.getFields(); + List sinkDatasetRowFields = sinkDatasetRow.getFields(); + Assertions.assertEquals(testDatasetRowFields.size(), sinkDatasetRowFields.size()); + for (int fieldIndex = 0; fieldIndex < testDatasetRowFields.size(); fieldIndex++) { + Field testDatasetRowField = testDatasetRowFields.get(fieldIndex); + Field sinkDatasetRowField = sinkDatasetRowFields.get(fieldIndex); + Assertions.assertEquals( + testDatasetRowField.getObjectValue(testDatasetRowField.getDataType()), + sinkDatasetRowField.getObjectValue(sinkDatasetRowField.getDataType())); + } + } + } + + @AfterEach + public void closeIoTDBContainer() throws IoTDBConnectionException { + if (session != null) { + session.close(); + } + if (iotdbServer != null) { + iotdbServer.stop(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf index 503668487e4..3fb4636c9de 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf @@ -26,27 +26,36 @@ env { } source { - FakeSource { + IoTDB { result_table_name = "fake" - schema = { - fields { - name = "string" - age = "int" - } + + node_urls = "spark_e2e_iotdb_sink:6668" + username = "root" + password = "root" + sql = "SELECT c_string, c_boolean, c_tinyint, c_smallint, c_int, c_bigint, c_float, c_double FROM root.source_group.* WHERE time < 4102329600000 align by device" + lower_bound = 1 + upper_bound = 4102329600000 + num_partitions = 10 + fields { + ts = timestamp + device_name = string + + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double } } - - # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } transform { sql { - sql = "select * from (values('root.ln.d1', '1660147200000', 'status,value', 'true,1001'), ('root.ln.d1', '1660233600000', 'status,value', 'false,1002')) t (device, `timestamp`, measurements, `values`)" + sql = "SELECT 'root.sink_group.device_a' AS device_name, ts, c_string, c_boolean, c_tinyint, c_smallint, c_int, c_bigint, c_float, c_double FROM fake WHERE device_name = 'root.source_group.device_a' UNION SELECT 'root.sink_group.device_b' AS device_name, ts, c_string, c_boolean, c_tinyint, c_smallint, c_int, c_bigint, c_float, c_double FROM fake WHERE device_name = 'root.source_group.device_b'" } - - # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/transform/sql } sink { @@ -54,10 +63,10 @@ sink { node_urls = ["spark_e2e_iotdb_sink:6668"] username = "root" password = "root" + key_device = "device_name" + key_timestamp = "ts" + key_measurement_fields = ["c_string", "c_boolean", "c_tinyint", "c_smallint", "c_int", "c_bigint", "c_float", "c_double"] batch_size = 1 batch_interval_ms = 10 } - - # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/sink/IoTDB } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf new file mode 100644 index 00000000000..61fa0357d3d --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 2 + job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + IoTDB { + result_table_name = "fake" + + node_urls = "flink_e2e_iotdb_sink:6667" + username = "root" + password = "root" + sql = "SELECT c_string, c_boolean, c_tinyint, c_smallint, c_int, c_bigint, c_float, c_double FROM root.source_group.* WHERE time < 4102329600000 align by device" + lower_bound = 1 + upper_bound = 4102329600000 + num_partitions = 10 + fields { + ts = timestamp + device_name = string + + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + } + } +} + +transform { + sql { + sql = "SELECT 'root.sink_group.device_a' AS device_name, ts, c_string, c_boolean, c_tinyint, c_smallint, c_int, c_bigint, c_float, c_double FROM fake WHERE device_name = 'root.source_group.device_a' UNION SELECT 'root.sink_group.device_b' AS device_name, ts, c_string, c_boolean, c_tinyint, c_smallint, c_int, c_bigint, c_float, c_double FROM fake WHERE device_name = 'root.source_group.device_b'" + } +} + +sink { + IoTDB { + node_urls = ["flink_e2e_iotdb_sink:6667"] + username = "root" + password = "root" + key_device = "device_name" + key_timestamp = "ts" + key_measurement_fields = ["c_string", "c_boolean", "c_tinyint", "c_smallint", "c_int", "c_bigint", "c_float", "c_double"] + batch_size = 1 + batch_interval_ms = 10 + } +} \ No newline at end of file