diff --git a/docs/en/connector-v2/source/FakeSource.md b/docs/en/connector-v2/source/FakeSource.md index d3d93bd2bb7..665605b33d4 100644 --- a/docs/en/connector-v2/source/FakeSource.md +++ b/docs/en/connector-v2/source/FakeSource.md @@ -18,18 +18,43 @@ just for some test cases such as type conversion or connector new feature testin ## Options -| name | type | required | default value | -|---------------------|--------|----------|---------------| -| schema | config | yes | - | -| rows | config | no | - | -| row.num | int | no | 5 | -| split.num | int | no | 1 | -| split.read-interval | long | no | 1 | -| map.size | int | no | 5 | -| array.size | int | no | 5 | -| bytes.length | int | no | 5 | -| string.length | int | no | 5 | -| common-options | | no | - | +| name | type | required | default value | +|---------------------|----------|----------|-------------------------| +| schema | config | yes | - | +| rows | config | no | - | +| row.num | int | no | 5 | +| split.num | int | no | 1 | +| split.read-interval | long | no | 1 | +| map.size | int | no | 5 | +| array.size | int | no | 5 | +| bytes.length | int | no | 5 | +| string.length | int | no | 5 | +| string.fake.mode | string | no | range | +| tinyint.fake.mode | string | no | range | +| tinyint.min | tinyint | no | 0 | +| tinyint.max | tinyint | no | 127 | +| tinyint.template | list | no | - | +| smallint.fake.mode | string | no | range | +| smallint.min | smallint | no | 0 | +| smallint.max | smallint | no | 32767 | +| smallint.template | list | no | - | +| int.fake.template | string | no | range | +| int.min | int | no | 0 | +| int.max | int | no | 0x7fffffff | +| int.template | list | no | - | +| bigint.fake.mode | string | no | range | +| bigint.min | bigint | no | 0 | +| bigint.max | bigint | no | 0x7fffffffffffffff | +| bigint.template | list | no | - | +| float.fake.mode | string | no | range | +| float.min | float | no | 0 | +| float.max | float | no | 0x1.fffffeP+127 | +| float.template | list | no | - | +| double.fake.mode | string | no | range | +| double.min | double | no | 0 | +| double.max | double | no | 0x1.fffffffffffffP+1023 | +| double.template | list | no | - | +| common-options | | no | - | ### schema [config] @@ -133,6 +158,110 @@ The length of `bytes` type that connector generated The length of `string` type that connector generated +### string.fake.mode + +The fake mode of generating string data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `string.template` option + +### string.template + +The template list of string type that connector generated, if user configured it, connector will randomly select an item from the template list + +### tinyint.fake.mode + +The fake mode of generating tinyint data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `tinyint.template` option + +### tinyint.min + +The min value of tinyint data that connector generated + +### tinyint.max + +The max value of tinyint data that connector generated + +### tinyint.template + +The template list of tinyint type that connector generated, if user configured it, connector will randomly select an item from the template list + +### smallint.fake.mode + +The fake mode of generating smallint data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `smallint.template` option + +### smallint.min + +The min value of smallint data that connector generated + +### smallint.max + +The max value of smallint data that connector generated + +### smallint.template + +The template list of smallint type that connector generated, if user configured it, connector will randomly select an item from the template list + +### int.fake.mode + +The fake mode of generating int data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `int.template` option + +### int.min + +The min value of int data that connector generated + +### int.max + +The max value of int data that connector generated + +### int.template + +The template list of int type that connector generated, if user configured it, connector will randomly select an item from the template list + +### bigint.fake.mode + +The fake mode of generating bigint data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `bigint.template` option + +### bigint.min + +The min value of bigint data that connector generated + +### bigint.max + +The max value of bigint data that connector generated + +### bigint.template + +The template list of bigint type that connector generated, if user configured it, connector will randomly select an item from the template list + +### float.fake.mode + +The fake mode of generating float data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `float.template` option + +### float.min + +The min value of float data that connector generated + +### float.max + +The max value of float data that connector generated + +### float.template + +The template list of float type that connector generated, if user configured it, connector will randomly select an item from the template list + +### double.fake.mode + +The fake mode of generating float data, support `range` and `template`, default `range`,if use configured it to `template`, user should also configured `double.template` option + +### double.min + +The min value of double data that connector generated + +### double.max + +The max value of double data that connector generated + +### double.template + +The template list of double type that connector generated, if user configured it, connector will randomly select an item from the template list + ### common options Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details @@ -227,6 +356,71 @@ FakeSource { } ``` +Using template + +```hocon +FakeSource { + row.num = 5 + string.fake.mode = "template" + string.template = ["tyrantlucifer", "hailin", "kris", "fanjia", "zongwen", "gaojun"] + tinyint.fake.mode = "template" + tinyint.template = [1, 2, 3, 4, 5, 6, 7, 8, 9] + smalling.fake.mode = "template" + smallint.template = [10, 11, 12, 13, 14, 15, 16, 17, 18, 19] + int.fake.mode = "template" + int.template = [20, 21, 22, 23, 24, 25, 26, 27, 28, 29] + bigint.fake.mode = "template" + bigint.template = [30, 31, 32, 33, 34, 35, 36, 37, 38, 39] + float.fake.mode = "template" + float.template = [40.0, 41.0, 42.0, 43.0] + double.fake.mode = "template" + double.template = [44.0, 45.0, 46.0, 47.0] + schema { + fields { + c_string = string + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + } + } +} +``` + +Use range + +```hocon +FakeSource { + row.num = 5 + string.template = ["tyrantlucifer", "hailin", "kris", "fanjia", "zongwen", "gaojun"] + tinyint.min = 1 + tinyint.max = 9 + smallint.min = 10 + smallint.max = 19 + int.min = 20 + int.max = 29 + bigint.min = 30 + bigint.max = 39 + float.min = 40.0 + float.max = 43.0 + double.min = 44.0 + double.max = 47.0 + schema { + fields { + c_string = string + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + } + } +} +``` + ## Changelog ### 2.2.0-beta 2022-09-26 @@ -246,4 +440,5 @@ FakeSource { ### next version -- [Feature] Support config fake data rows [3865](https://github.com/apache/incubator-seatunnel/pull/3865) \ No newline at end of file +- [Feature] Support config fake data rows [3865](https://github.com/apache/incubator-seatunnel/pull/3865) +- [Feature] Support config template or range for fake data [3932](https://github.com/apache/incubator-seatunnel/pull/3932) \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java index 035b755c7b6..5804ed02729 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java @@ -18,13 +18,48 @@ package org.apache.seatunnel.connectors.seatunnel.fake.config; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ARRAY_SIZE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.BIGINT_FAKE_MODE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.BIGINT_MAX; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.BIGINT_MIN; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.BIGINT_TEMPLATE; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.BYTES_LENGTH; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.DATE_DAY_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.DATE_MONTH_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.DATE_YEAR_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.DOUBLE_FAKE_MODE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.DOUBLE_MAX; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.DOUBLE_MIN; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.DOUBLE_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.FLOAT_FAKE_MODE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.FLOAT_MAX; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.FLOAT_MIN; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.FLOAT_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.INT_FAKE_MODE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.INT_MAX; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.INT_MIN; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.INT_TEMPLATE; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.MAP_SIZE; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ROWS; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ROW_NUM; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SMALLINT_FAKE_MODE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SMALLINT_MAX; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SMALLINT_MIN; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SMALLINT_TEMPLATE; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_NUM; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_READ_INTERVAL; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_FAKE_MODE; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_LENGTH; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_HOUR_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_MINUTE_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_SECOND_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TINYINT_FAKE_MODE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TINYINT_MAX; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TINYINT_MIN; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TINYINT_TEMPLATE; + +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException; import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions; @@ -42,18 +77,98 @@ public class FakeConfig implements Serializable { @Builder.Default private int rowNum = ROW_NUM.defaultValue(); + @Builder.Default private int splitNum = SPLIT_NUM.defaultValue(); + @Builder.Default private int splitReadInterval = SPLIT_READ_INTERVAL.defaultValue(); + @Builder.Default private int mapSize = MAP_SIZE.defaultValue(); + @Builder.Default private int arraySize = ARRAY_SIZE.defaultValue(); + @Builder.Default private int bytesLength = BYTES_LENGTH.defaultValue(); + @Builder.Default private int stringLength = STRING_LENGTH.defaultValue(); + + @Builder.Default + private int tinyintMin = TINYINT_MIN.defaultValue(); + + @Builder.Default + private int tinyintMax = TINYINT_MAX.defaultValue(); + + @Builder.Default + private int smallintMin = SMALLINT_MIN.defaultValue(); + + @Builder.Default + private int smallintMax = SMALLINT_MAX.defaultValue(); + + @Builder.Default + private int intMin = INT_MIN.defaultValue(); + + @Builder.Default + private int intMax = INT_MAX.defaultValue(); + + @Builder.Default + private long bigintMin = BIGINT_MIN.defaultValue(); + + @Builder.Default + private long bigintMax = BIGINT_MAX.defaultValue(); + + @Builder.Default + private double floatMin = FLOAT_MIN.defaultValue(); + + @Builder.Default + private double floatMax = FLOAT_MAX.defaultValue(); + + @Builder.Default + private double doubleMin = DOUBLE_MIN.defaultValue(); + + @Builder.Default + private double doubleMax = DOUBLE_MAX.defaultValue(); + + @Builder.Default + private FakeOption.FakeMode stringFakeMode = STRING_FAKE_MODE.defaultValue(); + + @Builder.Default + private FakeOption.FakeMode tinyintFakeMode = TINYINT_FAKE_MODE.defaultValue(); + + @Builder.Default + private FakeOption.FakeMode smallintFakeMode = SMALLINT_FAKE_MODE.defaultValue(); + + @Builder.Default + private FakeOption.FakeMode intFakeMode = INT_FAKE_MODE.defaultValue(); + + @Builder.Default + private FakeOption.FakeMode bigintFakeMode = BIGINT_FAKE_MODE.defaultValue(); + + @Builder.Default + private FakeOption.FakeMode floatFakeMode = FLOAT_FAKE_MODE.defaultValue(); + + @Builder.Default + private FakeOption.FakeMode doubleFakeMode = DOUBLE_FAKE_MODE.defaultValue(); + + private List stringTemplate; + private List tinyintTemplate; + private List smallintTemplate; + private List intTemplate; + private List bigTemplate; + private List floatTemplate; + private List doubleTemplate; + + private List dateYearTemplate; + private List dateMonthTemplate; + private List dateDayTemplate; + + private List timeHourTemplate; + private List timeMinuteTemplate; + private List timeSecondTemplate; + private List fakeRows; public static FakeConfig buildWithConfig(Config config) { @@ -90,6 +205,174 @@ public static FakeConfig buildWithConfig(Config config) { } builder.fakeRows(rows); } + if (config.hasPath(STRING_TEMPLATE.key())) { + builder.stringTemplate(config.getStringList(STRING_TEMPLATE.key())); + } + if (config.hasPath(TINYINT_TEMPLATE.key())) { + builder.tinyintTemplate(config.getIntList(TINYINT_TEMPLATE.key())); + } + if (config.hasPath(SMALLINT_TEMPLATE.key())) { + builder.smallintTemplate(config.getIntList(SMALLINT_TEMPLATE.key())); + } + if (config.hasPath(INT_TEMPLATE.key())) { + builder.intTemplate(config.getIntList(INT_TEMPLATE.key())); + } + if (config.hasPath(BIGINT_TEMPLATE.key())) { + builder.bigTemplate(config.getLongList(BIGINT_TEMPLATE.key())); + } + if (config.hasPath(FLOAT_TEMPLATE.key())) { + builder.floatTemplate(config.getDoubleList(FLOAT_TEMPLATE.key())); + } + if (config.hasPath(DOUBLE_TEMPLATE.key())) { + builder.doubleTemplate(config.getDoubleList(DOUBLE_TEMPLATE.key())); + } + if (config.hasPath(DATE_YEAR_TEMPLATE.key())) { + builder.dateYearTemplate(config.getIntList(DATE_YEAR_TEMPLATE.key())); + } + if (config.hasPath(DATE_MONTH_TEMPLATE.key())) { + builder.dateMonthTemplate(config.getIntList(DATE_MONTH_TEMPLATE.key())); + } + if (config.hasPath(DATE_DAY_TEMPLATE.key())) { + builder.dateDayTemplate(config.getIntList(DATE_DAY_TEMPLATE.key())); + } + if (config.hasPath(TIME_HOUR_TEMPLATE.key())) { + builder.timeHourTemplate(config.getIntList(TIME_HOUR_TEMPLATE.key())); + } + if (config.hasPath(TIME_MINUTE_TEMPLATE.key())) { + builder.timeMinuteTemplate(config.getIntList(TIME_MINUTE_TEMPLATE.key())); + } + if (config.hasPath(TIME_SECOND_TEMPLATE.key())) { + builder.timeSecondTemplate(config.getIntList(TIME_SECOND_TEMPLATE.key())); + } + if (config.hasPath(TINYINT_MIN.key())) { + int tinyintMin = config.getInt(TINYINT_MIN.key()); + if (tinyintMin < TINYINT_MIN.defaultValue() || tinyintMin > TINYINT_MAX.defaultValue()) { + throw new FakeConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, + TINYINT_MIN.key() + " should >= " + TINYINT_MIN.defaultValue() + + " and <= " + TINYINT_MAX.defaultValue()); + } + builder.tinyintMin(tinyintMin); + } + if (config.hasPath(TINYINT_MAX.key())) { + int tinyintMax = config.getInt(TINYINT_MAX.key()); + if (tinyintMax < TINYINT_MIN.defaultValue() || tinyintMax > TINYINT_MAX.defaultValue()) { + throw new FakeConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, + TINYINT_MAX.key() + " should >= " + TINYINT_MIN.defaultValue() + + " and <= " + TINYINT_MAX.defaultValue()); + } + builder.tinyintMax(tinyintMax); + } + if (config.hasPath(SMALLINT_MIN.key())) { + int smallintMin = config.getInt(SMALLINT_MIN.key()); + if (smallintMin < SMALLINT_MIN.defaultValue() || smallintMin > SMALLINT_MAX.defaultValue()) { + throw new FakeConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, + SMALLINT_MIN.key() + " should >= " + SMALLINT_MIN.defaultValue() + + " and <= " + SMALLINT_MAX.defaultValue()); + } + builder.smallintMin(smallintMin); + } + if (config.hasPath(SMALLINT_MAX.key())) { + int smallintMax = config.getInt(SMALLINT_MAX.key()); + if (smallintMax < SMALLINT_MIN.defaultValue() || smallintMax > SMALLINT_MAX.defaultValue()) { + throw new FakeConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, + SMALLINT_MAX.key() + " should >= " + SMALLINT_MIN.defaultValue() + + " and <= " + SMALLINT_MAX.defaultValue()); + } + builder.smallintMax(smallintMax); + } + if (config.hasPath(INT_MIN.key())) { + int intMin = config.getInt(INT_MIN.key()); + if (intMin < INT_MIN.defaultValue() || intMin > INT_MAX.defaultValue()) { + throw new FakeConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, + INT_MIN.key() + " should >= " + INT_MIN.defaultValue() + + " and <= " + INT_MAX.defaultValue()); + } + builder.intMin(intMin); + } + if (config.hasPath(INT_MAX.key())) { + int intMax = config.getInt(INT_MAX.key()); + if (intMax < INT_MIN.defaultValue() || intMax > INT_MAX.defaultValue()) { + throw new FakeConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, + INT_MAX.key() + " should >= " + INT_MIN.defaultValue() + + " and <= " + INT_MAX.defaultValue()); + } + builder.intMax(intMax); + } + if (config.hasPath(BIGINT_MIN.key())) { + long bigintMin = config.getLong(BIGINT_MIN.key()); + if (bigintMin < BIGINT_MIN.defaultValue() || bigintMin > BIGINT_MAX.defaultValue()) { + throw new FakeConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, + BIGINT_MIN.key() + " should >= " + BIGINT_MIN.defaultValue() + + " and <= " + BIGINT_MAX.defaultValue()); + } + builder.bigintMin(bigintMin); + } + if (config.hasPath(BIGINT_MAX.key())) { + long bigintMax = config.getLong(BIGINT_MAX.key()); + if (bigintMax < BIGINT_MIN.defaultValue() || bigintMax > BIGINT_MAX.defaultValue()) { + throw new FakeConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, + BIGINT_MAX.key() + " should >= " + BIGINT_MIN.defaultValue() + + " and <= " + BIGINT_MAX.defaultValue()); + } + builder.bigintMax(bigintMax); + } + if (config.hasPath(FLOAT_MIN.key())) { + double floatMin = config.getDouble(FLOAT_MIN.key()); + if (floatMin < FLOAT_MIN.defaultValue() || floatMin > FLOAT_MAX.defaultValue()) { + throw new FakeConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, + FLOAT_MIN.key() + " should >= " + FLOAT_MIN.defaultValue() + + " and <= " + FLOAT_MAX.defaultValue()); + } + builder.floatMin(floatMin); + } + if (config.hasPath(FLOAT_MAX.key())) { + double floatMax = config.getDouble(FLOAT_MAX.key()); + if (floatMax < FLOAT_MIN.defaultValue() || floatMax > FLOAT_MAX.defaultValue()) { + throw new FakeConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, + FLOAT_MAX.key() + " should >= " + FLOAT_MIN.defaultValue() + + " and <= " + FLOAT_MAX.defaultValue()); + } + builder.floatMax(floatMax); + } + if (config.hasPath(DOUBLE_MIN.key())) { + double doubleMin = config.getDouble(DOUBLE_MIN.key()); + if (doubleMin < DOUBLE_MIN.defaultValue() || doubleMin > DOUBLE_MAX.defaultValue()) { + throw new FakeConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, + DOUBLE_MIN.key() + " should >= " + DOUBLE_MIN.defaultValue() + + " and <= " + DOUBLE_MAX.defaultValue()); + } + builder.doubleMin(doubleMin); + } + if (config.hasPath(DOUBLE_MAX.key())) { + double doubleMax = config.getDouble(DOUBLE_MAX.key()); + if (doubleMax < DOUBLE_MIN.defaultValue() || doubleMax > DOUBLE_MAX.defaultValue()) { + throw new FakeConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, + DOUBLE_MAX.key() + " should >= " + DOUBLE_MIN.defaultValue() + + " and <= " + DOUBLE_MAX.defaultValue()); + } + builder.doubleMax(doubleMax); + } + if (config.hasPath(STRING_FAKE_MODE.key())) { + builder.stringFakeMode(FakeOption.FakeMode.parse(config.getString(STRING_FAKE_MODE.key()))); + } + if (config.hasPath(TINYINT_FAKE_MODE.key())) { + builder.tinyintFakeMode(FakeOption.FakeMode.parse(config.getString(TINYINT_FAKE_MODE.key()))); + } + if (config.hasPath(SMALLINT_FAKE_MODE.key())) { + builder.smallintFakeMode(FakeOption.FakeMode.parse(config.getString(SMALLINT_FAKE_MODE.key()))); + } + if (config.hasPath(INT_FAKE_MODE.key())) { + builder.intFakeMode(FakeOption.FakeMode.parse(config.getString(INT_FAKE_MODE.key()))); + } + if (config.hasPath(BIGINT_FAKE_MODE.key())) { + builder.bigintFakeMode(FakeOption.FakeMode.parse(config.getString(BIGINT_FAKE_MODE.key()))); + } + if (config.hasPath(FLOAT_FAKE_MODE.key())) { + builder.floatFakeMode(FakeOption.FakeMode.parse(config.getString(FLOAT_FAKE_MODE.key()))); + } + if (config.hasPath(DOUBLE_FAKE_MODE.key())) { + builder.doubleFakeMode(FakeOption.FakeMode.parse(config.getString(DOUBLE_FAKE_MODE.key()))); + } return builder.build(); } diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java index c50ed00782e..9c123cdf3d0 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java @@ -26,21 +26,206 @@ @SuppressWarnings("checkstyle:MagicNumber") public class FakeOption { - public static final Option> ROWS = Options.key("rows").listType(SeaTunnelRow.class).noDefaultValue() - .withDescription("The row list of fake data output per degree of parallelism"); - public static final Option ROW_NUM = Options.key("row.num").intType().defaultValue(5) - .withDescription("The total number of data generated per degree of parallelism"); - public static final Option SPLIT_NUM = Options.key("split.num").intType().defaultValue(1) - .withDescription("The number of splits generated by the enumerator for each degree of parallelism"); - public static final Option SPLIT_READ_INTERVAL = Options.key("split.read-interval").intType().defaultValue(1) - .withDescription("The interval(mills) between two split reads in a reader"); - public static final Option MAP_SIZE = Options.key("map.size").intType().defaultValue(5) - .withDescription("The size of map type that connector generated"); - public static final Option ARRAY_SIZE = Options.key("array.size").intType().defaultValue(5) - .withDescription("The size of array type that connector generated"); - public static final Option BYTES_LENGTH = Options.key("bytes.length").intType().defaultValue(5) - .withDescription("The length of bytes type that connector generated"); - public static final Option STRING_LENGTH = Options.key("string.length").intType().defaultValue(5) - .withDescription("The length of string type that connector generated"); + public static final Option> ROWS = Options.key("rows") + .listType(SeaTunnelRow.class) + .noDefaultValue() + .withDescription("The row list of fake data output per degree of parallelism"); + public static final Option ROW_NUM = Options.key("row.num") + .intType() + .defaultValue(5) + .withDescription("The total number of data generated per degree of parallelism"); + public static final Option SPLIT_NUM = Options.key("split.num") + .intType() + .defaultValue(1) + .withDescription("The number of splits generated by the enumerator for each degree of parallelism"); + public static final Option SPLIT_READ_INTERVAL = Options.key("split.read-interval") + .intType() + .defaultValue(1) + .withDescription("The interval(mills) between two split reads in a reader"); + public static final Option MAP_SIZE = Options.key("map.size") + .intType() + .defaultValue(5) + .withDescription("The size of map type that connector generated"); + public static final Option ARRAY_SIZE = Options.key("array.size") + .intType() + .defaultValue(5) + .withDescription("The size of array type that connector generated"); + public static final Option BYTES_LENGTH = Options.key("bytes.length") + .intType() + .defaultValue(5) + .withDescription("The length of bytes type that connector generated"); + public static final Option STRING_LENGTH = Options.key("string.length") + .intType() + .defaultValue(5) + .withDescription("The length of string type that connector generated"); + public static final Option> STRING_TEMPLATE = Options.key("string.template") + .listType() + .noDefaultValue() + .withDescription("The template list of string type that connector generated, if user configured it, connector will randomly select an item from the template list"); + + public static final Option> TINYINT_TEMPLATE = Options.key("tinyint.template") + .listType(Integer.class) + .noDefaultValue() + .withDescription("The template list of tinyint type, if user configured it, connector will randomly select an item from the template list"); + + public static final Option> SMALLINT_TEMPLATE = Options.key("smallint.template") + .listType(Integer.class) + .noDefaultValue() + .withDescription("The template list of smallint type, if user configured it, connector will randomly select an item from the template list"); + + public static final Option> INT_TEMPLATE = Options.key("int.template") + .listType(Integer.class) + .noDefaultValue() + .withDescription("The template list of int type, if user configured it, connector will randomly select an item from the template list"); + + public static final Option> BIGINT_TEMPLATE = Options.key("bigint.template") + .listType(Long.class) + .noDefaultValue() + .withDescription("The template list of bigint type, if user configured it, connector will randomly select an item from the template list"); + + public static final Option> FLOAT_TEMPLATE = Options.key("float.template") + .listType(Double.class) + .noDefaultValue() + .withDescription("The template list of float type, if user configured it, connector will randomly select an item from the template list"); + + public static final Option> DOUBLE_TEMPLATE = Options.key("double.template") + .listType(Double.class) + .noDefaultValue() + .withDescription("The template list of double type, if user configured it, connector will randomly select an item from the template list"); + + public static final Option> DATE_YEAR_TEMPLATE = Options.key("date.year.template") + .listType(Integer.class) + .noDefaultValue() + .withDescription("The template list of year of date like 'yyyy', if user configured it, connector will randomly select an item from the template list"); + + public static final Option> DATE_MONTH_TEMPLATE = Options.key("date.month.template") + .listType(Integer.class) + .noDefaultValue() + .withDescription("The template list of month of date like 'MM', if user configured it, connector will randomly select an item from the template list"); + + public static final Option> DATE_DAY_TEMPLATE = Options.key("date.day.template") + .listType(Integer.class) + .noDefaultValue() + .withDescription("The template list of day of date like 'dd', if user configured it, connector will randomly select an item from the template list"); + + public static final Option> TIME_HOUR_TEMPLATE = Options.key("time.hour.template") + .listType(Integer.class) + .noDefaultValue() + .withDescription("The template list of hour of time like 'HH', if user configured it, connector will randomly select an item from the template list"); + + public static final Option> TIME_MINUTE_TEMPLATE = Options.key("time.minute.template") + .listType(Integer.class) + .noDefaultValue() + .withDescription("The template list of minute of time like 'mm', if user configured it, connector will randomly select an item from the template list"); + + public static final Option> TIME_SECOND_TEMPLATE = Options.key("time.second.template") + .listType(Integer.class) + .noDefaultValue() + .withDescription("The template list of second of time like 'ss', if user configured it, connector will randomly select an item from the template list"); + + public static final Option TINYINT_MIN = Options.key("tinyint.min") + .intType() + .defaultValue(0) + .withDescription("The min value of tinyint type data"); + + public static final Option TINYINT_MAX = Options.key("tinyint.max") + .intType() + .defaultValue((int) Byte.MAX_VALUE) + .withDescription("The min value of tinyint type data"); + + public static final Option SMALLINT_MIN = Options.key("smallint.min") + .intType() + .defaultValue(0) + .withDescription("The min value of smallint type data"); + + public static final Option SMALLINT_MAX = Options.key("smallint.max") + .intType() + .defaultValue((int) Short.MAX_VALUE) + .withDescription("The max value of smallint type data"); + + public static final Option INT_MIN = Options.key("int.min") + .intType() + .defaultValue(0) + .withDescription("The min value of int type data"); + + public static final Option INT_MAX = Options.key("int.max") + .intType() + .defaultValue(Integer.MAX_VALUE) + .withDescription("The max value of int type data"); + + public static final Option BIGINT_MIN = Options.key("bigint.min") + .longType() + .defaultValue(0L) + .withDescription("The min value of bigint type data"); + + public static final Option BIGINT_MAX = Options.key("bigint.max") + .longType() + .defaultValue(Long.MAX_VALUE) + .withDescription("The max value of bigint type data"); + + public static final Option FLOAT_MIN = Options.key("float.min") + .floatType() + .defaultValue(0F) + .withDescription("The min value of float type data"); + + public static final Option FLOAT_MAX = Options.key("float.max") + .floatType() + .defaultValue(Float.MAX_VALUE) + .withDescription("The max value of float type data"); + + public static final Option DOUBLE_MIN = Options.key("double.min") + .doubleType() + .defaultValue(0D) + .withDescription("The min value of double type data"); + + public static final Option DOUBLE_MAX = Options.key("double.max") + .doubleType() + .defaultValue(Double.MAX_VALUE) + .withDescription("The max value of double type data"); + + public static final Option STRING_FAKE_MODE = Options.key("string.fake.mode") + .enumType(FakeMode.class) + .defaultValue(FakeMode.RANGE) + .withDescription("The fake mode of generating string data"); + + public static final Option TINYINT_FAKE_MODE = Options.key("tinyint.fake.mode") + .enumType(FakeMode.class) + .defaultValue(FakeMode.RANGE) + .withDescription("The fake mode of generating tinyint data"); + + + public static final Option SMALLINT_FAKE_MODE = Options.key("smallint.fake.mode") + .enumType(FakeMode.class) + .defaultValue(FakeMode.RANGE) + .withDescription("The fake mode of generating smallint data"); + + public static final Option INT_FAKE_MODE = Options.key("int.fake.mode") + .enumType(FakeMode.class) + .defaultValue(FakeMode.RANGE) + .withDescription("The fake mode of generating int data"); + + public static final Option BIGINT_FAKE_MODE = Options.key("bigint.fake.mode") + .enumType(FakeMode.class) + .defaultValue(FakeMode.RANGE) + .withDescription("The fake mode of generating bigint data"); + + public static final Option FLOAT_FAKE_MODE = Options.key("float.fake.mode") + .enumType(FakeMode.class) + .defaultValue(FakeMode.RANGE) + .withDescription("The fake mode of generating float data"); + + public static final Option DOUBLE_FAKE_MODE = Options.key("double.fake.mode") + .enumType(FakeMode.class) + .defaultValue(FakeMode.RANGE) + .withDescription("The fake mode of generating double data"); + + public enum FakeMode { + RANGE, + TEMPLATE; + + public static FakeMode parse(String s) { + return FakeMode.valueOf(s.toUpperCase()); + } + } } diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java index 5848bf3d67e..3f44bf82153 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java @@ -29,24 +29,20 @@ import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig; import org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException; +import org.apache.seatunnel.connectors.seatunnel.fake.utils.FakeDataRandomUtils; import org.apache.seatunnel.format.json.JsonDeserializationSchema; -import org.apache.commons.lang3.RandomStringUtils; -import org.apache.commons.lang3.RandomUtils; - import java.io.IOException; import java.lang.reflect.Array; -import java.math.BigDecimal; -import java.time.LocalDateTime; import java.util.ArrayList; import java.util.HashMap; import java.util.List; public class FakeDataGenerator { - public static final String SCHEMA = "schema"; private final SeaTunnelSchema schema; private final FakeConfig fakeConfig; private final JsonDeserializationSchema jsonDeserializationSchema; + private final FakeDataRandomUtils fakeDataRandomUtils; public FakeDataGenerator(SeaTunnelSchema schema, FakeConfig fakeConfig) { this.schema = schema; @@ -55,6 +51,7 @@ public FakeDataGenerator(SeaTunnelSchema schema, FakeConfig fakeConfig) { null : new JsonDeserializationSchema( false, false, schema.getSeaTunnelRowType()); + this.fakeDataRandomUtils = new FakeDataRandomUtils(fakeConfig); } private SeaTunnelRow convertRow(FakeConfig.RowData rowData) { @@ -121,35 +118,34 @@ private Object randomColumnValue(SeaTunnelDataType fieldType) { } return objectMap; case STRING: - return RandomStringUtils.randomAlphabetic(fakeConfig.getStringLength()); + return fakeDataRandomUtils.randomString(); case BOOLEAN: - return RandomUtils.nextInt(0, 2) == 1; + return fakeDataRandomUtils.randomBoolean(); case TINYINT: - return (byte) RandomUtils.nextInt(0, 255); + return fakeDataRandomUtils.randomTinyint(); case SMALLINT: - return (short) RandomUtils.nextInt(Byte.MAX_VALUE, Short.MAX_VALUE); + return fakeDataRandomUtils.randomSmallint(); case INT: - return RandomUtils.nextInt(Short.MAX_VALUE, Integer.MAX_VALUE); + return fakeDataRandomUtils.randomInt(); case BIGINT: - return RandomUtils.nextLong(Integer.MAX_VALUE, Long.MAX_VALUE); + return fakeDataRandomUtils.randomBigint(); case FLOAT: - return RandomUtils.nextFloat(Float.MIN_VALUE, Float.MAX_VALUE); + return fakeDataRandomUtils.randomFloat(); case DOUBLE: - return RandomUtils.nextDouble(Float.MAX_VALUE, Double.MAX_VALUE); + return fakeDataRandomUtils.randomDouble(); case DECIMAL: DecimalType decimalType = (DecimalType) fieldType; - return new BigDecimal(RandomStringUtils.randomNumeric(decimalType.getPrecision() - decimalType.getScale()) + "." + - RandomStringUtils.randomNumeric(decimalType.getScale())); + return fakeDataRandomUtils.randomBigDecimal(decimalType.getPrecision(), decimalType.getScale()); case NULL: return null; case BYTES: - return RandomStringUtils.randomAlphabetic(fakeConfig.getBytesLength()).getBytes(); + return fakeDataRandomUtils.randomBytes(); case DATE: - return randomLocalDateTime().toLocalDate(); + return fakeDataRandomUtils.randomLocalDate(); case TIME: - return randomLocalDateTime().toLocalTime(); + return fakeDataRandomUtils.randomLocalTime(); case TIMESTAMP: - return randomLocalDateTime(); + return fakeDataRandomUtils.randomLocalDateTime(); case ROW: SeaTunnelDataType[] fieldTypes = ((SeaTunnelRowType) fieldType).getFieldTypes(); Object[] objects = new Object[fieldTypes.length]; @@ -164,15 +160,4 @@ private Object randomColumnValue(SeaTunnelDataType fieldType) { "SeaTunnel Fake source connector not support this data type"); } } - - @SuppressWarnings("magicnumber") - private LocalDateTime randomLocalDateTime() { - return LocalDateTime.of( - LocalDateTime.now().getYear(), - RandomUtils.nextInt(1, 12), - RandomUtils.nextInt(1, 28), - RandomUtils.nextInt(0, 24), - RandomUtils.nextInt(0, 59) - ); - } } diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java index f4822ea56ed..a0726d6cb9e 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java @@ -18,19 +18,39 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ARRAY_SIZE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.BIGINT_FAKE_MODE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.BIGINT_TEMPLATE; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.BYTES_LENGTH; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.DATE_DAY_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.DATE_MONTH_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.DATE_YEAR_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.DOUBLE_FAKE_MODE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.DOUBLE_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.FLOAT_FAKE_MODE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.FLOAT_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.INT_FAKE_MODE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.INT_TEMPLATE; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.MAP_SIZE; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ROWS; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ROW_NUM; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SMALLINT_FAKE_MODE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SMALLINT_TEMPLATE; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_NUM; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_READ_INTERVAL; -import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_LENGTH; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_FAKE_MODE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_HOUR_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_MINUTE_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_SECOND_TEMPLATE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TINYINT_FAKE_MODE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TINYINT_TEMPLATE; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; +import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption; import com.google.auto.service.AutoService; @@ -44,17 +64,32 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(SeaTunnelSchema.SCHEMA) - .optional( - ROWS, - ROW_NUM, - SPLIT_NUM, - SPLIT_READ_INTERVAL, - MAP_SIZE, - ARRAY_SIZE, - BYTES_LENGTH, - STRING_LENGTH) - .build(); + .required(SeaTunnelSchema.SCHEMA) + .optional(STRING_FAKE_MODE) + .conditional(STRING_FAKE_MODE, FakeOption.FakeMode.TEMPLATE, STRING_TEMPLATE) + .optional(TINYINT_FAKE_MODE) + .conditional(TINYINT_FAKE_MODE, FakeOption.FakeMode.TEMPLATE, TINYINT_TEMPLATE) + .optional(SMALLINT_FAKE_MODE) + .conditional(SMALLINT_FAKE_MODE, FakeOption.FakeMode.TEMPLATE, SMALLINT_TEMPLATE) + .optional(INT_FAKE_MODE) + .conditional(INT_FAKE_MODE, FakeOption.FakeMode.TEMPLATE, INT_TEMPLATE) + .optional(BIGINT_FAKE_MODE) + .conditional(BIGINT_FAKE_MODE, FakeOption.FakeMode.TEMPLATE, BIGINT_TEMPLATE) + .optional(FLOAT_FAKE_MODE) + .conditional(FLOAT_FAKE_MODE, FakeOption.FakeMode.TEMPLATE, FLOAT_TEMPLATE) + .optional(DOUBLE_FAKE_MODE) + .conditional(DOUBLE_FAKE_MODE, FakeOption.FakeMode.TEMPLATE, DOUBLE_TEMPLATE) + .optional( + ROWS, + ROW_NUM, + SPLIT_NUM, + SPLIT_READ_INTERVAL, + MAP_SIZE, + ARRAY_SIZE, + BYTES_LENGTH, + DATE_YEAR_TEMPLATE, DATE_MONTH_TEMPLATE, DATE_DAY_TEMPLATE, + TIME_HOUR_TEMPLATE, TIME_MINUTE_TEMPLATE, TIME_SECOND_TEMPLATE) + .build(); } @Override diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java index 04144834b26..3c703825e4b 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java @@ -21,8 +21,7 @@ import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig; import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.ArrayList; @@ -33,9 +32,8 @@ import java.util.Map; import java.util.Set; +@Slf4j public class FakeSourceSplitEnumerator implements SourceSplitEnumerator { - - private static final Logger LOG = LoggerFactory.getLogger(FakeSourceSplitEnumerator.class); private final SourceSplitEnumerator.Context enumeratorContext; private final Map> pendingSplits; @@ -102,7 +100,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { private void discoverySplits() { Set allSplit = new HashSet<>(); - LOG.info("Starting to calculate splits."); + log.info("Starting to calculate splits."); int numReaders = enumeratorContext.currentParallelism(); int readerRowNum = fakeConfig.getRowNum(); int splitNum = fakeConfig.getSplitNum(); @@ -116,8 +114,8 @@ private void discoverySplits() { assignedSplits.forEach(allSplit::remove); addSplitChangeToPendingAssignments(allSplit); - LOG.debug("Assigned {} to {} readers.", allSplit, numReaders); - LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size()); + log.debug("Assigned {} to {} readers.", allSplit, numReaders); + log.info("Calculated splits successfully, the size of splits is {}.", allSplit.size()); } private void addSplitChangeToPendingAssignments(Collection newSplits) { @@ -139,7 +137,7 @@ private void assignPendingSplits() { // Mark pending splits as already assigned assignedSplits.addAll(pendingAssignmentForReader); // Assign pending splits to reader - LOG.info("Assigning splits to readers {} {}", pendingReader, pendingAssignmentForReader); + log.info("Assigning splits to readers {} {}", pendingReader, pendingAssignmentForReader); enumeratorContext.assignSplit(pendingReader, new ArrayList<>(pendingAssignmentForReader)); enumeratorContext.signalNoMoreSplits(pendingReader); } diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java new file mode 100644 index 00000000000..7ad5eedc66c --- /dev/null +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.fake.utils; + +import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.List; + +public class FakeDataRandomUtils { + private final FakeConfig fakeConfig; + + public FakeDataRandomUtils(FakeConfig fakeConfig) { + this.fakeConfig = fakeConfig; + } + + private static T randomFromList(List list) { + int index = RandomUtils.nextInt(0, list.size() - 1); + return list.get(index); + } + + public Boolean randomBoolean() { + return RandomUtils.nextInt(0, 2) == 1; + } + + public BigDecimal randomBigDecimal(int precision, int scale) { + return new BigDecimal(RandomStringUtils.randomNumeric(precision - scale) + "." + + RandomStringUtils.randomNumeric(scale)); + } + + public byte[] randomBytes() { + return RandomStringUtils.randomAlphabetic(fakeConfig.getBytesLength()).getBytes(); + } + + public String randomString() { + List stringTemplate = fakeConfig.getStringTemplate(); + if (!CollectionUtils.isEmpty(stringTemplate)) { + return randomFromList(stringTemplate); + } + return RandomStringUtils.randomAlphabetic(fakeConfig.getStringLength()); + } + + public Byte randomTinyint() { + List tinyintTemplate = fakeConfig.getTinyintTemplate(); + if (!CollectionUtils.isEmpty(tinyintTemplate)) { + return randomFromList(tinyintTemplate).byteValue(); + } + return (byte) RandomUtils.nextInt(fakeConfig.getTinyintMin(), fakeConfig.getTinyintMax()); + } + + public Short randomSmallint() { + List smallintTemplate = fakeConfig.getSmallintTemplate(); + if (!CollectionUtils.isEmpty(smallintTemplate)) { + return randomFromList(smallintTemplate).shortValue(); + } + return (short) RandomUtils.nextInt(fakeConfig.getSmallintMin(), fakeConfig.getSmallintMax()); + } + + public Integer randomInt() { + List intTemplate = fakeConfig.getIntTemplate(); + if (!CollectionUtils.isEmpty(intTemplate)) { + return randomFromList(intTemplate); + } + return RandomUtils.nextInt(fakeConfig.getIntMin(), fakeConfig.getIntMax()); + } + + public Long randomBigint() { + List bigTemplate = fakeConfig.getBigTemplate(); + if (!CollectionUtils.isEmpty(bigTemplate)) { + return randomFromList(bigTemplate); + } + return RandomUtils.nextLong(fakeConfig.getBigintMin(), fakeConfig.getBigintMax()); + } + + public Float randomFloat() { + List floatTemplate = fakeConfig.getFloatTemplate(); + if (!CollectionUtils.isEmpty(floatTemplate)) { + return randomFromList(floatTemplate).floatValue(); + } + return RandomUtils.nextFloat((float) fakeConfig.getFloatMin(), (float) fakeConfig.getFloatMax()); + } + + public Double randomDouble() { + List doubleTemplate = fakeConfig.getDoubleTemplate(); + if (!CollectionUtils.isEmpty(doubleTemplate)) { + return randomFromList(doubleTemplate); + } + return RandomUtils.nextDouble(fakeConfig.getDoubleMin(), fakeConfig.getDoubleMax()); + } + + public LocalDate randomLocalDate() { + return randomLocalDateTime().toLocalDate(); + } + + public LocalTime randomLocalTime() { + return randomLocalDateTime().toLocalTime(); + } + + @SuppressWarnings("checkstyle:MagicNumber") + public LocalDateTime randomLocalDateTime() { + int year; + int month; + int day; + int hour; + int minute; + int second; + // init year + if (!CollectionUtils.isEmpty(fakeConfig.getDateYearTemplate())) { + year = randomFromList(fakeConfig.getDateYearTemplate()); + } else { + year = LocalDateTime.now().getYear(); + } + // init month + if (!CollectionUtils.isEmpty(fakeConfig.getDateMonthTemplate())) { + month = randomFromList(fakeConfig.getDateMonthTemplate()); + } else { + month = RandomUtils.nextInt(1, 13); + } + // init day + if (!CollectionUtils.isEmpty(fakeConfig.getDateDayTemplate())) { + day = randomFromList(fakeConfig.getDateDayTemplate()); + } else { + day = RandomUtils.nextInt(1, 29); + } + // init hour + if (!CollectionUtils.isEmpty(fakeConfig.getTimeHourTemplate())) { + hour = randomFromList(fakeConfig.getTimeHourTemplate()); + } else { + hour = RandomUtils.nextInt(0, 24); + } + // init minute + if (!CollectionUtils.isEmpty(fakeConfig.getTimeMinuteTemplate())) { + minute = randomFromList(fakeConfig.getTimeMinuteTemplate()); + } else { + minute = RandomUtils.nextInt(0, 60); + } + // init second + if (!CollectionUtils.isEmpty(fakeConfig.getTimeSecondTemplate())) { + second = randomFromList(fakeConfig.getTimeSecondTemplate()); + } else { + second = RandomUtils.nextInt(0, 60); + } + return LocalDateTime.of(year, month, day, hour, minute, second); + } +} diff --git a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/FakeFactoryTest.java b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeFactoryTest.java similarity index 88% rename from seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/FakeFactoryTest.java rename to seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeFactoryTest.java index 175fc623080..6dd59e7c39f 100644 --- a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/FakeFactoryTest.java +++ b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeFactoryTest.java @@ -15,9 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.fake; - -import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSourceFactory; +package org.apache.seatunnel.connectors.seatunnel.fake.source; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeIT.java index 51d4decce0b..9df613f6d34 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeIT.java @@ -31,5 +31,9 @@ public class FakeIT extends TestSuiteBase { public void testFakeConnector(TestContainer container) throws IOException, InterruptedException { Container.ExecResult textWriteResult = container.executeJob("/fake_to_assert.conf"); Assertions.assertEquals(0, textWriteResult.getExitCode()); + Container.ExecResult fakeWithRange = container.executeJob("/fake_to_assert_with_range.conf"); + Assertions.assertEquals(0, fakeWithRange.getExitCode()); + Container.ExecResult fakeWithTemplate = container.executeJob("/fake_to_assert_with_template.conf"); + Assertions.assertEquals(0, fakeWithTemplate.getExitCode()); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_range.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_range.conf new file mode 100644 index 00000000000..2045fe67726 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_range.conf @@ -0,0 +1,165 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + spark.master = local +} + +source { + FakeSource { + row.num = 5 + string.template = ["tyrantlucifer", "hailin", "kris", "fanjia", "zongwen", "gaojun"] + tinyint.min = 1 + tinyint.max = 9 + smallint.min = 10 + smallint.max = 19 + int.min = 20 + int.max = 29 + bigint.min = 30 + bigint.max = 39 + float.min = 40.0 + float.max = 43.0 + double.min = 44.0 + double.max = 47.0 + schema { + fields { + c_string = string + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + } + } + } +} + +sink { + Assert { + rules { + rule_ruls = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = MIN_LENGTH + rule_value = 4 + }, + { + rule_type = MAX_LENGTH + rule_value = 13 + } + ] + }, + { + field_name = c_tinyint + field_type = byte + field_value = [ + { + rule_type = MIN + rule_value = 1 + }, + { + rule_type = MAX + rule_value = 9 + } + ] + }, + { + field_name = c_smallint + field_type = short + field_value = [ + { + rule_type = MIN + rule_value = 10 + }, + { + rule_type = MAX + rule_value = 19 + } + ] + }, + { + field_name = c_int + field_type = int + field_value = [ + { + rule_type = MIN + rule_value = 20 + }, + { + rule_type = MAX + rule_value = 29 + } + ] + }, + { + field_name = c_bigint + field_type = long + field_value = [ + { + rule_type = MIN + rule_value = 30 + }, + { + rule_type = MAX + rule_value = 39 + } + ] + }, + { + field_name = c_float + field_type = float + field_value = [ + { + rule_type = MIN + rule_value = 40 + }, + { + rule_type = MAX + rule_value = 43 + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = MIN + rule_value = 44 + }, + { + rule_type = MAX + rule_value = 47 + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_template.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_template.conf new file mode 100644 index 00000000000..41d449a8b98 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_template.conf @@ -0,0 +1,166 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + spark.master = local +} + +source { + FakeSource { + row.num = 5 + string.fake.mode = "template" + string.template = ["tyrantlucifer", "hailin", "kris", "fanjia", "zongwen", "gaojun"] + tinyint.fake.mode = "template" + tinyint.template = [1, 2, 3, 4, 5, 6, 7, 8, 9] + smalling.fake.mode = "template" + smallint.template = [10, 11, 12, 13, 14, 15, 16, 17, 18, 19] + int.fake.mode = "template" + int.template = [20, 21, 22, 23, 24, 25, 26, 27, 28, 29] + bigint.fake.mode = "template" + bigint.template = [30, 31, 32, 33, 34, 35, 36, 37, 38, 39] + float.fake.mode = "template" + float.template = [40.0, 41.0, 42.0, 43.0] + double.fake.mode = "template" + double.template = [44.0, 45.0, 46.0, 47.0] + schema { + fields { + c_string = string + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + } + } + } +} + +sink { + Assert { + rules { + rule_ruls = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = MIN_LENGTH + rule_value = 4 + }, + { + rule_type = MAX_LENGTH + rule_value = 13 + } + ] + }, + { + field_name = c_tinyint + field_type = byte + field_value = [ + { + rule_type = MIN + rule_value = 1 + }, + { + rule_type = MAX + rule_value = 9 + } + ] + }, + { + field_name = c_smallint + field_type = short + field_value = [ + { + rule_type = MIN + rule_value = 10 + }, + { + rule_type = MAX + rule_value = 19 + } + ] + }, + { + field_name = c_int + field_type = int + field_value = [ + { + rule_type = MIN + rule_value = 20 + }, + { + rule_type = MAX + rule_value = 29 + } + ] + }, + { + field_name = c_bigint + field_type = long + field_value = [ + { + rule_type = MIN + rule_value = 30 + }, + { + rule_type = MAX + rule_value = 39 + } + ] + }, + { + field_name = c_float + field_type = float + field_value = [ + { + rule_type = MIN + rule_value = 40 + }, + { + rule_type = MAX + rule_value = 43 + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = MIN + rule_value = 44 + }, + { + rule_type = MAX + rule_value = 47 + } + ] + } + ] + } + } +} \ No newline at end of file