From fd34fb7a141c4e4c55b074ad6be97d38fc663823 Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Thu, 22 Jul 2021 13:54:32 -0500 Subject: [PATCH 1/2] feat: Update SUBSTRING function to accept BYTES types --- .../ksqldb-reference/scalar-functions.md | 7 +- .../io/confluent/ksql/util/BytesUtils.java | 15 +- .../io/confluent/ksql/util/BytesUtilTest.java | 44 +++++ .../ksql/function/udf/string/FromBytes.java | 5 +- .../ksql/function/udf/string/Substring.java | 56 +++++-- .../ksql/function/udf/string/ToBytes.java | 5 +- .../function/udf/string/SubstringTest.java | 61 ++++++- .../7.1.0_1626980273694/plan.json | 154 ++++++++++++++++++ .../7.1.0_1626980273694/spec.json | 136 ++++++++++++++++ .../7.1.0_1626980273694/topology | 13 ++ .../7.1.0_1626980274000/plan.json | 154 ++++++++++++++++++ .../7.1.0_1626980274000/spec.json | 140 ++++++++++++++++ .../7.1.0_1626980274000/topology | 13 ++ .../5.5.0_1581572103595/plan.json | 4 +- .../5.5.0_1581572103595/spec.json | 2 +- .../6.0.0_1588893945209/plan.json | 4 +- .../6.0.0_1589910891558/plan.json | 4 +- .../5.5.0_1581572103613/plan.json | 4 +- .../5.5.0_1581572103613/spec.json | 2 +- .../6.0.0_1588893945230/plan.json | 4 +- .../6.0.0_1588893945230/spec.json | 2 +- .../6.0.0_1589910891594/plan.json | 4 +- .../6.0.0_1589910891594/spec.json | 2 +- .../5.5.0_1581572103632/plan.json | 4 +- .../5.5.0_1581572103632/spec.json | 2 +- .../6.0.0_1588893945250/plan.json | 4 +- .../6.0.0_1588893945250/spec.json | 2 +- .../6.0.0_1589910891615/plan.json | 4 +- .../6.0.0_1589910891615/spec.json | 2 +- .../5.5.0_1581572103658/plan.json | 4 +- .../5.5.0_1581572103658/spec.json | 2 +- .../6.0.0_1588893945272/plan.json | 4 +- .../6.0.0_1588893945272/spec.json | 2 +- .../6.0.0_1589910891647/plan.json | 4 +- .../6.0.0_1589910891647/spec.json | 2 +- .../7.1.0_1626980269454/plan.json | 154 ++++++++++++++++++ .../7.1.0_1626980269454/spec.json | 126 ++++++++++++++ .../7.1.0_1626980269454/topology | 13 ++ .../7.1.0_1626980273601/plan.json | 154 ++++++++++++++++++ .../7.1.0_1626980273601/spec.json | 126 ++++++++++++++ .../7.1.0_1626980273601/topology | 13 ++ .../7.1.0_1626980273797/plan.json | 154 ++++++++++++++++++ .../7.1.0_1626980273797/spec.json | 129 +++++++++++++++ .../7.1.0_1626980273797/topology | 13 ++ .../7.1.0_1626980273891/plan.json | 154 ++++++++++++++++++ .../7.1.0_1626980273891/spec.json | 129 +++++++++++++++ .../7.1.0_1626980273891/topology | 13 ++ .../query-validation-tests/substring.json | 51 +++++- 48 files changed, 2035 insertions(+), 65 deletions(-) create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_just_pos_-_JSON/7.1.0_1626980273694/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_just_pos_-_JSON/7.1.0_1626980273694/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_just_pos_-_JSON/7.1.0_1626980273694/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_pos_and_length_-_JSON/7.1.0_1626980274000/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_pos_and_length_-_JSON/7.1.0_1626980274000/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_pos_and_length_-_JSON/7.1.0_1626980274000/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_JSON/7.1.0_1626980269454/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_JSON/7.1.0_1626980269454/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_JSON/7.1.0_1626980269454/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_PROTOBUF/7.1.0_1626980273601/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_PROTOBUF/7.1.0_1626980273601/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_PROTOBUF/7.1.0_1626980273601/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_JSON/7.1.0_1626980273797/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_JSON/7.1.0_1626980273797/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_JSON/7.1.0_1626980273797/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_PROTOBUF/7.1.0_1626980273891/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_PROTOBUF/7.1.0_1626980273891/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_PROTOBUF/7.1.0_1626980273891/topology diff --git a/docs/developer-guide/ksqldb-reference/scalar-functions.md b/docs/developer-guide/ksqldb-reference/scalar-functions.md index 576f25e83eaa..bac03ab93b09 100644 --- a/docs/developer-guide/ksqldb-reference/scalar-functions.md +++ b/docs/developer-guide/ksqldb-reference/scalar-functions.md @@ -1047,12 +1047,13 @@ SUBSTRING(col1, 2, 5) ```sql SUBSTRING(str, pos, [len]) +SUBSTRING(bytes, pos, [len]) ``` -Returns a substring of `str` that starts at -`pos` (first character is at position 1) and +Returns the portion of `str` or `bytes` that starts at +`pos` (first character or byte is at position 1) and has length `len`, or continues to the end of -the string. +the string or bytes. For example, `SUBSTRING("stream", 1, 4)` returns "stre". diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/BytesUtils.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/BytesUtils.java index 8c9210bf3e8c..536bc9e0feca 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/BytesUtils.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/BytesUtils.java @@ -19,6 +19,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Base64; import java.util.Map; import java.util.function.Function; @@ -91,9 +92,13 @@ public static byte[] getByteArray(final ByteBuffer buffer) { return null; } - // ByteBuffer.array() throws an exception if it is in read-only state. Protobuf usually - // returns ByteBuffer in read-only, so this util allows us to get the internal byte array. - if (buffer.isReadOnly()) { + // ByteBuffer.array() throws an exception if it is read-only or the array is null. + // Protobuf returns ByteBuffer as read-only, so this util allows us to get the internal + // byte array. + if (!buffer.hasArray()) { + // Reset internal array position to 0, which affects read-only buffers + buffer.clear(); + final byte[] internalByteArray = new byte[buffer.capacity()]; buffer.get(internalByteArray); return internalByteArray; @@ -102,6 +107,10 @@ public static byte[] getByteArray(final ByteBuffer buffer) { return buffer.array(); } + public static byte[] getByteArray(final ByteBuffer buffer, final int start, final int end) { + return Arrays.copyOfRange(getByteArray(buffer), start, end); + } + private static String hexEncoding(final byte[] value) { return BaseEncoding.base16().encode(value); } diff --git a/ksqldb-common/src/test/java/io/confluent/ksql/util/BytesUtilTest.java b/ksqldb-common/src/test/java/io/confluent/ksql/util/BytesUtilTest.java index 718ca8df9eb3..78659cf8dbd1 100644 --- a/ksqldb-common/src/test/java/io/confluent/ksql/util/BytesUtilTest.java +++ b/ksqldb-common/src/test/java/io/confluent/ksql/util/BytesUtilTest.java @@ -46,4 +46,48 @@ public void shouldReturnByteArrayOnWritableByteBuffer() { // Then assertThat(bytes, is(new byte[]{5})); } + + @Test + public void shouldReturnFullByteArrayWhenByteBufferPositionIsNotZero() { + // Given + final ByteBuffer buffer = ByteBuffer.wrap(new byte[]{5, 10, 15}).asReadOnlyBuffer(); + + // This moves the internal array position to the next element and affects when we get + // bytes from read-only buffers + buffer.get(); + + // When + final byte[] bytes = BytesUtils.getByteArray(buffer); + + // Then + assertThat(bytes, is(new byte[]{5, 10, 15})); + } + + @Test + public void shouldReturnSubArray() { + // Given + final ByteBuffer buffer = ByteBuffer.wrap(new byte[]{1, 2, 3, 4}); + + // When + final byte[] bytes = BytesUtils.getByteArray(buffer, 1, 3); + + // Then + assertThat(bytes, is(new byte[]{2, 3})); + } + + @Test + public void shouldReturnSubArrayWhenByteBufferPositionIsNotZero() { + // Given + final ByteBuffer buffer = ByteBuffer.wrap(new byte[]{1, 2, 3, 4}).asReadOnlyBuffer(); + + // This moves the internal array position to the next element and affects when we get + // bytes from read-only buffers + buffer.get(); + + // When + final byte[] bytes = BytesUtils.getByteArray(buffer, 1, 3); + + // Then + assertThat(bytes, is(new byte[]{2, 3})); + } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/FromBytes.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/FromBytes.java index 94c12db61e2d..59acc27b4512 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/FromBytes.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/FromBytes.java @@ -17,6 +17,7 @@ import io.confluent.ksql.function.FunctionCategory; import io.confluent.ksql.function.udf.Udf; import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; import io.confluent.ksql.util.BytesUtils; import io.confluent.ksql.util.KsqlConstants; @@ -32,7 +33,9 @@ public class FromBytes { @Udf(description = "Converts a BYTES value to STRING in the specified encoding. " + "The accepted encoders are 'hex', 'utf8', 'ascii', and 'base64'.") - public String fromBytes(final ByteBuffer value, final String encoding) { + public String fromBytes( + @UdfParameter(description = "The bytes value to convert.") final ByteBuffer value, + @UdfParameter(description = "The encoding to use on conversion.") final String encoding) { return (value == null) ? null : BytesUtils.encode(BytesUtils.getByteArray(value), encoding); } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/Substring.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/Substring.java index 894ccdf24df2..64a8bace11e1 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/Substring.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/Substring.java @@ -19,18 +19,21 @@ import io.confluent.ksql.function.udf.Udf; import io.confluent.ksql.function.udf.UdfDescription; import io.confluent.ksql.function.udf.UdfParameter; +import io.confluent.ksql.util.BytesUtils; import io.confluent.ksql.util.KsqlConstants; +import java.nio.ByteBuffer; + @SuppressWarnings("unused") // Invoked via reflection. @UdfDescription( name = "substring", category = FunctionCategory.STRING, author = KsqlConstants.CONFLUENT_AUTHOR, - description = "Returns a substring of the passed in value." + description = "Returns the portion of the string or bytes passed in value." ) public class Substring { - @Udf(description = "Returns a substring of str from pos to the end of str") + @Udf(description = "Returns the portion of str from pos to the end of str") public String substring( @UdfParameter(description = "The source string.") final String str, @UdfParameter(description = "The base-one position to start from.") final Integer pos @@ -38,11 +41,11 @@ public String substring( if (str == null || pos == null) { return null; } - final int start = getStartIndex(str, pos); + final int start = getStartIndex(str.length(), pos); return str.substring(start); } - @Udf(description = "Returns a substring of str that starts at pos and is of length len") + @Udf(description = "Returns the portion of str that starts at pos and is of length len") public String substring( @UdfParameter(description = "The source string.") final String str, @UdfParameter(description = "The base-one position to start from.") final Integer pos, @@ -51,18 +54,49 @@ public String substring( if (str == null || pos == null || length == null) { return null; } - final int start = getStartIndex(str, pos); - final int end = getEndIndex(str, start, length); + final int start = getStartIndex(str.length(), pos); + final int end = getEndIndex(str.length(), start, length); return str.substring(start, end); } - private static int getStartIndex(final String value, final Integer pos) { + @Udf(description = "Returns the portion of the bytes value from pos to the end of the " + + "bytes value") + public ByteBuffer substring( + @UdfParameter(description = "The source bytes.") final ByteBuffer bytes, + @UdfParameter(description = "The base-one position to start from.") final Integer pos + ) { + if (bytes == null || pos == null) { + return null; + } + + final int start = getStartIndex(bytes.capacity(), pos); + final int end = bytes.capacity(); + return ByteBuffer.wrap(BytesUtils.getByteArray(bytes, start, end)); + } + + @Udf(description = "Returns the portion of the bytes value that starts at pos and is of " + + "length len") + public ByteBuffer substring( + @UdfParameter(description = "The source bytes.") final ByteBuffer bytes, + @UdfParameter(description = "The base-one position to start from.") final Integer pos, + @UdfParameter(description = "The length to extract.") final Integer length + ) { + if (bytes == null || pos == null) { + return null; + } + + final int start = getStartIndex(bytes.capacity(), pos); + final int end = getEndIndex(bytes.capacity(), start, length); + return ByteBuffer.wrap(BytesUtils.getByteArray(bytes, start, end)); + } + + private static int getStartIndex(final int valueLength, final Integer pos) { return pos < 0 - ? Math.max(value.length() + pos, 0) - : Math.max(Math.min(pos - 1, value.length()), 0); + ? Math.max(valueLength + pos, 0) + : Math.max(Math.min(pos - 1, valueLength), 0); } - private static int getEndIndex(final String value, final int start, final int length) { - return Math.max(Math.min(start + length, value.length()), start); + private static int getEndIndex(final int valueLength, final int start, final int length) { + return Math.max(Math.min(start + length, valueLength), start); } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/ToBytes.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/ToBytes.java index 7c36d6d96611..e6721822e37f 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/ToBytes.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/ToBytes.java @@ -17,6 +17,7 @@ import io.confluent.ksql.function.FunctionCategory; import io.confluent.ksql.function.udf.Udf; import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; import io.confluent.ksql.util.BytesUtils; import io.confluent.ksql.util.KsqlConstants; @@ -32,7 +33,9 @@ public class ToBytes { @Udf(description = "Converts a STRING value in the specified encoding to BYTES. " + "The accepted encoders are 'hex', 'utf8', 'ascii', and 'base64'.") - public ByteBuffer toBytes(final String value, final String encoding) { + public ByteBuffer toBytes( + @UdfParameter(description = "The string to convert.") final String value, + @UdfParameter(description = "The type of encoding.") final String encoding) { return (value == null) ? null : ByteBuffer.wrap(BytesUtils.decode(value, encoding)); } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/SubstringTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/SubstringTest.java index c78c45fa875d..b63463c1e394 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/SubstringTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/SubstringTest.java @@ -22,6 +22,8 @@ import org.junit.Before; import org.junit.Test; +import java.nio.ByteBuffer; + public class SubstringTest { private Substring udf; @@ -32,33 +34,65 @@ public void setUp() { } @Test - public void shouldReturnNullOnNullValue() { - assertThat(udf.substring(null, 1), is(nullValue())); - assertThat(udf.substring(null, 1, 1), is(nullValue())); + public void shouldReturnNullOnStringNullValue() { + assertThat(udf.substring((String) null, 1), is(nullValue())); + assertThat(udf.substring((String) null, 1, 1), is(nullValue())); assertThat(udf.substring("some string", null, 1), is(nullValue())); assertThat(udf.substring("some string", 1, null), is(nullValue())); } @Test - public void shouldUseOneBasedIndexing() { + public void shouldReturnNullOnBytesNullValue() { + assertThat(udf.substring((ByteBuffer) null, 1), is(nullValue())); + assertThat(udf.substring((ByteBuffer) null, 1, 1), is(nullValue())); + assertThat(udf.substring("some string", null, 1), is(nullValue())); + assertThat(udf.substring("some string", 1, null), is(nullValue())); + } + + @Test + public void shouldUseOneBasedIndexingOnString() { assertThat(udf.substring("a test string", 1, 1), is("a")); assertThat(udf.substring("a test string", -1, 1), is("g")); } @Test - public void shouldExtractFromStartForPositivePositions() { + public void shouldUseOneBasedIndexingOnBytes() { + assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 1, 1), + is(ByteBuffer.wrap(new byte[]{1}))); + assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), -1, 1), + is(ByteBuffer.wrap(new byte[]{4}))); + } + + @Test + public void shouldExtractFromStartForPositivePositionsOnStrings() { assertThat(udf.substring("a test string", 3), is("test string")); assertThat(udf.substring("a test string", 3, 4), is("test")); } @Test - public void shouldExtractFromEndForNegativePositions() { + public void shouldExtractFromStartForPositivePositionsOnBytes() { + assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 3), + is(ByteBuffer.wrap(new byte[]{3,4}))); + assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 3, 4), + is(ByteBuffer.wrap(new byte[]{3,4}))); + } + + @Test + public void shouldExtractFromEndForNegativePositionsOnStrings() { assertThat(udf.substring("a test string", -6), is("string")); assertThat(udf.substring("a test string", -6, 2), is("st")); } @Test - public void shouldTruncateOutOfBoundIndexes() { + public void shouldExtractFromEndForNegativePositionsOnBytes() { + assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), -3), + is(ByteBuffer.wrap(new byte[]{2,3,4}))); + assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), -3, 3), + is(ByteBuffer.wrap(new byte[]{2,3,4}))); + } + + @Test + public void shouldTruncateOutOfBoundIndexesOnStrings() { assertThat(udf.substring("a test string", 0), is("a test string")); assertThat(udf.substring("a test string", 100), is("")); assertThat(udf.substring("a test string", -100), is("a test string")); @@ -66,4 +100,17 @@ public void shouldTruncateOutOfBoundIndexes() { assertThat(udf.substring("a test string", 3, -100), is("")); } + @Test + public void shouldTruncateOutOfBoundIndexesOnBytes() { + assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 0), + is(ByteBuffer.wrap(new byte[]{1,2,3,4}))); + assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 100), + is(ByteBuffer.wrap(new byte[]{}))); + assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), -100), + is(ByteBuffer.wrap(new byte[]{1,2,3,4}))); + assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 3, 100), + is(ByteBuffer.wrap(new byte[]{3,4}))); + assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 3, -100), + is(ByteBuffer.wrap(new byte[]{}))); + } } \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_just_pos_-_JSON/7.1.0_1626980273694/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_just_pos_-_JSON/7.1.0_1626980273694/plan.json new file mode 100644 index 000000000000..4ea2fee56ab4 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_just_pos_-_JSON/7.1.0_1626980273694/plan.json @@ -0,0 +1,154 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K STRING KEY, SOURCE BYTES) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`K` STRING KEY, `SOURCE` BYTES", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`K` STRING KEY, `SUBSTRING` BYTES, `NULL_POS` BYTES", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`K` STRING KEY, `SOURCE` BYTES" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "SUBSTRING(SOURCE, 6) AS SUBSTRING", "SUBSTRING(SOURCE, null) AS NULL_POS" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_just_pos_-_JSON/7.1.0_1626980273694/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_just_pos_-_JSON/7.1.0_1626980273694/spec.json new file mode 100644 index 000000000000..73dfb003479d --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_just_pos_-_JSON/7.1.0_1626980273694/spec.json @@ -0,0 +1,136 @@ +{ + "version" : "7.1.0", + "timestamp" : 1626980273694, + "path" : "query-validation-tests/substring.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`K` STRING KEY, `SOURCE` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`K` STRING KEY, `SUBSTRING` BYTES, `NULL_POS` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "do substring with bytes with just pos - JSON", + "inputs" : [ { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : "c29tZV9zdHJpbmc=" + } + }, { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : "YW5vdGhlcg==" + } + }, { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : "c2hvcnQ=" + } + }, { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : null + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : "c3RyaW5n", + "NULL_POS" : null + } + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : "ZXI=", + "NULL_POS" : null + } + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : "", + "NULL_POS" : null + } + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : null, + "NULL_POS" : null + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (K STRING KEY, source BYTES) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6) AS SUBSTRING, SUBSTRING(source, null) AS NULL_POS FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `SUBSTRING` BYTES, `NULL_POS` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `SOURCE` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_just_pos_-_JSON/7.1.0_1626980273694/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_just_pos_-_JSON/7.1.0_1626980273694/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_just_pos_-_JSON/7.1.0_1626980273694/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_pos_and_length_-_JSON/7.1.0_1626980274000/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_pos_and_length_-_JSON/7.1.0_1626980274000/plan.json new file mode 100644 index 000000000000..30516c7f1f4d --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_pos_and_length_-_JSON/7.1.0_1626980274000/plan.json @@ -0,0 +1,154 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K STRING KEY, SOURCE BYTES) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`K` STRING KEY, `SOURCE` BYTES", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`K` STRING KEY, `SUBSTRING` BYTES, `NULL_POS` BYTES, `NULL_LEN` BYTES", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`K` STRING KEY, `SOURCE` BYTES" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "SUBSTRING(SOURCE, 6, 3) AS SUBSTRING", "SUBSTRING(SOURCE, null) AS NULL_POS", "SUBSTRING(SOURCE, 6, null) AS NULL_LEN" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_pos_and_length_-_JSON/7.1.0_1626980274000/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_pos_and_length_-_JSON/7.1.0_1626980274000/spec.json new file mode 100644 index 000000000000..4fb69664af98 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_pos_and_length_-_JSON/7.1.0_1626980274000/spec.json @@ -0,0 +1,140 @@ +{ + "version" : "7.1.0", + "timestamp" : 1626980274000, + "path" : "query-validation-tests/substring.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`K` STRING KEY, `SOURCE` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`K` STRING KEY, `SUBSTRING` BYTES, `NULL_POS` BYTES, `NULL_LEN` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "do substring with bytes with pos and length - JSON", + "inputs" : [ { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : "c29tZV9zdHJpbmc=" + } + }, { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : "YW5vdGhlcg==" + } + }, { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : "c2hvcnQ=" + } + }, { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : null + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : "c3Ry", + "NULL_POS" : null, + "NULL_LEN" : null + } + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : "ZXI=", + "NULL_POS" : null, + "NULL_LEN" : null + } + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : "", + "NULL_POS" : null, + "NULL_LEN" : null + } + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : null, + "NULL_POS" : null, + "NULL_LEN" : null + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (K STRING KEY, source BYTES) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6, 3) AS SUBSTRING, SUBSTRING(source, null) AS NULL_POS, SUBSTRING(source, 6, null) AS NULL_LEN FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `SUBSTRING` BYTES, `NULL_POS` BYTES, `NULL_LEN` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `SOURCE` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_pos_and_length_-_JSON/7.1.0_1626980274000/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_pos_and_length_-_JSON/7.1.0_1626980274000/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_bytes_with_pos_and_length_-_JSON/7.1.0_1626980274000/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_JSON/5.5.0_1581572103595/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_JSON/5.5.0_1581572103595/plan.json index 724e2d4960de..5b875ae3db70 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_JSON/5.5.0_1581572103595/plan.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_JSON/5.5.0_1581572103595/plan.json @@ -25,7 +25,7 @@ "queryPlan" : null }, { "@type" : "ksqlPlanV1", - "statementText" : "CREATE STREAM OUTPUT AS SELECT\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(null, 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES", "ddlCommand" : { "@type" : "createStreamV1", "sourceName" : "OUTPUT", @@ -79,7 +79,7 @@ "timestampColumn" : null, "sourceSchema" : "`ROWKEY` STRING KEY, `SOURCE` STRING" }, - "selectExpressions" : [ "SUBSTRING(SOURCE, 6) AS SUBSTRING", "SUBSTRING(null, 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS" ] + "selectExpressions" : [ "SUBSTRING(SOURCE, 6) AS SUBSTRING", "SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS" ] }, "formats" : { "keyFormat" : { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_JSON/5.5.0_1581572103595/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_JSON/5.5.0_1581572103595/spec.json index 0842fe6b5bc5..ee675cf91207 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_JSON/5.5.0_1581572103595/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_JSON/5.5.0_1581572103595/spec.json @@ -77,7 +77,7 @@ "replicas" : 1, "numPartitions" : 4 } ], - "statements" : [ "CREATE STREAM TEST (SOURCE STRING) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", "CREATE STREAM OUTPUT AS SELECT\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(null, 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES;" ], + "statements" : [ "CREATE STREAM TEST (SOURCE STRING) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", "CREATE STREAM OUTPUT AS SELECT\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES;" ], "post" : { "sources" : [ { "name" : "OUTPUT", diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_JSON/6.0.0_1588893945209/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_JSON/6.0.0_1588893945209/plan.json index 4e2230cedf4d..f53b89a3ce03 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_JSON/6.0.0_1588893945209/plan.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_JSON/6.0.0_1588893945209/plan.json @@ -18,7 +18,7 @@ } }, { "@type" : "ksqlPlanV1", - "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(null, 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES", "ddlCommand" : { "@type" : "createStreamV1", "sourceName" : "OUTPUT", @@ -62,7 +62,7 @@ }, "sourceSchema" : "`K` STRING KEY, `SOURCE` STRING" }, - "selectExpressions" : [ "SUBSTRING(SOURCE, 6) AS SUBSTRING", "SUBSTRING(null, 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS" ] + "selectExpressions" : [ "SUBSTRING(SOURCE, 6) AS SUBSTRING", "SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS" ] }, "formats" : { "keyFormat" : { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_JSON/6.0.0_1589910891558/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_JSON/6.0.0_1589910891558/plan.json index 41ae9b980446..c0f561d0b245 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_JSON/6.0.0_1589910891558/plan.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_JSON/6.0.0_1589910891558/plan.json @@ -18,7 +18,7 @@ } }, { "@type" : "ksqlPlanV1", - "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(null, 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES", "ddlCommand" : { "@type" : "createStreamV1", "sourceName" : "OUTPUT", @@ -63,7 +63,7 @@ "sourceSchema" : "`K` STRING KEY, `SOURCE` STRING" }, "keyColumnNames" : [ "K" ], - "selectExpressions" : [ "SUBSTRING(SOURCE, 6) AS SUBSTRING", "SUBSTRING(null, 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS" ] + "selectExpressions" : [ "SUBSTRING(SOURCE, 6) AS SUBSTRING", "SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS" ] }, "formats" : { "keyFormat" : { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/5.5.0_1581572103613/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/5.5.0_1581572103613/plan.json index 724e2d4960de..5b875ae3db70 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/5.5.0_1581572103613/plan.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/5.5.0_1581572103613/plan.json @@ -25,7 +25,7 @@ "queryPlan" : null }, { "@type" : "ksqlPlanV1", - "statementText" : "CREATE STREAM OUTPUT AS SELECT\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(null, 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES", "ddlCommand" : { "@type" : "createStreamV1", "sourceName" : "OUTPUT", @@ -79,7 +79,7 @@ "timestampColumn" : null, "sourceSchema" : "`ROWKEY` STRING KEY, `SOURCE` STRING" }, - "selectExpressions" : [ "SUBSTRING(SOURCE, 6) AS SUBSTRING", "SUBSTRING(null, 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS" ] + "selectExpressions" : [ "SUBSTRING(SOURCE, 6) AS SUBSTRING", "SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS" ] }, "formats" : { "keyFormat" : { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/5.5.0_1581572103613/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/5.5.0_1581572103613/spec.json index 4d84822dac7b..3d53314327ca 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/5.5.0_1581572103613/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/5.5.0_1581572103613/spec.json @@ -77,7 +77,7 @@ "replicas" : 1, "numPartitions" : 4 } ], - "statements" : [ "CREATE STREAM TEST (SOURCE STRING) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", "CREATE STREAM OUTPUT AS SELECT\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(null, 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES;" ], + "statements" : [ "CREATE STREAM TEST (SOURCE STRING) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", "CREATE STREAM OUTPUT AS SELECT\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES;" ], "post" : { "sources" : [ { "name" : "OUTPUT", diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/6.0.0_1588893945230/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/6.0.0_1588893945230/plan.json index 4e2230cedf4d..f53b89a3ce03 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/6.0.0_1588893945230/plan.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/6.0.0_1588893945230/plan.json @@ -18,7 +18,7 @@ } }, { "@type" : "ksqlPlanV1", - "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(null, 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES", "ddlCommand" : { "@type" : "createStreamV1", "sourceName" : "OUTPUT", @@ -62,7 +62,7 @@ }, "sourceSchema" : "`K` STRING KEY, `SOURCE` STRING" }, - "selectExpressions" : [ "SUBSTRING(SOURCE, 6) AS SUBSTRING", "SUBSTRING(null, 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS" ] + "selectExpressions" : [ "SUBSTRING(SOURCE, 6) AS SUBSTRING", "SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS" ] }, "formats" : { "keyFormat" : { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/6.0.0_1588893945230/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/6.0.0_1588893945230/spec.json index 968f5a297ec5..9440ff7936dd 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/6.0.0_1588893945230/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/6.0.0_1588893945230/spec.json @@ -77,7 +77,7 @@ "replicas" : 1, "numPartitions" : 4 } ], - "statements" : [ "CREATE STREAM TEST (K STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6) AS SUBSTRING, SUBSTRING(null, 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS FROM TEST;" ], + "statements" : [ "CREATE STREAM TEST (K STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6) AS SUBSTRING, SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS FROM TEST;" ], "post" : { "sources" : [ { "name" : "OUTPUT", diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/6.0.0_1589910891594/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/6.0.0_1589910891594/plan.json index 41ae9b980446..c0f561d0b245 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/6.0.0_1589910891594/plan.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/6.0.0_1589910891594/plan.json @@ -18,7 +18,7 @@ } }, { "@type" : "ksqlPlanV1", - "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(null, 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES", "ddlCommand" : { "@type" : "createStreamV1", "sourceName" : "OUTPUT", @@ -63,7 +63,7 @@ "sourceSchema" : "`K` STRING KEY, `SOURCE` STRING" }, "keyColumnNames" : [ "K" ], - "selectExpressions" : [ "SUBSTRING(SOURCE, 6) AS SUBSTRING", "SUBSTRING(null, 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS" ] + "selectExpressions" : [ "SUBSTRING(SOURCE, 6) AS SUBSTRING", "SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS" ] }, "formats" : { "keyFormat" : { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/6.0.0_1589910891594/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/6.0.0_1589910891594/spec.json index 56a0aeba4351..cdd71fa8af07 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/6.0.0_1589910891594/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_just_pos_-_PROTOBUF/6.0.0_1589910891594/spec.json @@ -77,7 +77,7 @@ "replicas" : 1, "numPartitions" : 4 } ], - "statements" : [ "CREATE STREAM TEST (K STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6) AS SUBSTRING, SUBSTRING(null, 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS FROM TEST;" ], + "statements" : [ "CREATE STREAM TEST (K STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6) AS SUBSTRING, SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS FROM TEST;" ], "post" : { "sources" : [ { "name" : "OUTPUT", diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/5.5.0_1581572103632/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/5.5.0_1581572103632/plan.json index cca0b7567077..43fa45015096 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/5.5.0_1581572103632/plan.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/5.5.0_1581572103632/plan.json @@ -25,7 +25,7 @@ "queryPlan" : null }, { "@type" : "ksqlPlanV1", - "statementText" : "CREATE STREAM OUTPUT AS SELECT\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(null, 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES", "ddlCommand" : { "@type" : "createStreamV1", "sourceName" : "OUTPUT", @@ -79,7 +79,7 @@ "timestampColumn" : null, "sourceSchema" : "`ROWKEY` STRING KEY, `SOURCE` STRING" }, - "selectExpressions" : [ "SUBSTRING(SOURCE, 6, 3) AS SUBSTRING", "SUBSTRING(null, 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS", "SUBSTRING(SOURCE, 6, null) AS NULL_LEN" ] + "selectExpressions" : [ "SUBSTRING(SOURCE, 6, 3) AS SUBSTRING", "SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS", "SUBSTRING(SOURCE, 6, null) AS NULL_LEN" ] }, "formats" : { "keyFormat" : { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/5.5.0_1581572103632/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/5.5.0_1581572103632/spec.json index 27a508ea1361..be628688ad57 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/5.5.0_1581572103632/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/5.5.0_1581572103632/spec.json @@ -80,7 +80,7 @@ "replicas" : 1, "numPartitions" : 4 } ], - "statements" : [ "CREATE STREAM TEST (SOURCE STRING) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", "CREATE STREAM OUTPUT AS SELECT\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(null, 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES;" ], + "statements" : [ "CREATE STREAM TEST (SOURCE STRING) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", "CREATE STREAM OUTPUT AS SELECT\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES;" ], "post" : { "sources" : [ { "name" : "OUTPUT", diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/6.0.0_1588893945250/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/6.0.0_1588893945250/plan.json index 85dcd234422b..64ba148fdfc8 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/6.0.0_1588893945250/plan.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/6.0.0_1588893945250/plan.json @@ -18,7 +18,7 @@ } }, { "@type" : "ksqlPlanV1", - "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(null, 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES", "ddlCommand" : { "@type" : "createStreamV1", "sourceName" : "OUTPUT", @@ -62,7 +62,7 @@ }, "sourceSchema" : "`K` STRING KEY, `SOURCE` STRING" }, - "selectExpressions" : [ "SUBSTRING(SOURCE, 6, 3) AS SUBSTRING", "SUBSTRING(null, 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS", "SUBSTRING(SOURCE, 6, null) AS NULL_LEN" ] + "selectExpressions" : [ "SUBSTRING(SOURCE, 6, 3) AS SUBSTRING", "SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS", "SUBSTRING(SOURCE, 6, null) AS NULL_LEN" ] }, "formats" : { "keyFormat" : { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/6.0.0_1588893945250/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/6.0.0_1588893945250/spec.json index e90e47787dfc..77057560732f 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/6.0.0_1588893945250/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/6.0.0_1588893945250/spec.json @@ -80,7 +80,7 @@ "replicas" : 1, "numPartitions" : 4 } ], - "statements" : [ "CREATE STREAM TEST (K STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6, 3) AS SUBSTRING, SUBSTRING(null, 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS, SUBSTRING(source, 6, null) AS NULL_LEN FROM TEST;" ], + "statements" : [ "CREATE STREAM TEST (K STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6, 3) AS SUBSTRING, SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS, SUBSTRING(source, 6, null) AS NULL_LEN FROM TEST;" ], "post" : { "sources" : [ { "name" : "OUTPUT", diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/6.0.0_1589910891615/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/6.0.0_1589910891615/plan.json index 326a68b3673c..477dcb9443c3 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/6.0.0_1589910891615/plan.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/6.0.0_1589910891615/plan.json @@ -18,7 +18,7 @@ } }, { "@type" : "ksqlPlanV1", - "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(null, 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES", "ddlCommand" : { "@type" : "createStreamV1", "sourceName" : "OUTPUT", @@ -63,7 +63,7 @@ "sourceSchema" : "`K` STRING KEY, `SOURCE` STRING" }, "keyColumnNames" : [ "K" ], - "selectExpressions" : [ "SUBSTRING(SOURCE, 6, 3) AS SUBSTRING", "SUBSTRING(null, 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS", "SUBSTRING(SOURCE, 6, null) AS NULL_LEN" ] + "selectExpressions" : [ "SUBSTRING(SOURCE, 6, 3) AS SUBSTRING", "SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS", "SUBSTRING(SOURCE, 6, null) AS NULL_LEN" ] }, "formats" : { "keyFormat" : { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/6.0.0_1589910891615/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/6.0.0_1589910891615/spec.json index e2cb55d89ca6..13a2c4bbb985 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/6.0.0_1589910891615/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_JSON/6.0.0_1589910891615/spec.json @@ -80,7 +80,7 @@ "replicas" : 1, "numPartitions" : 4 } ], - "statements" : [ "CREATE STREAM TEST (K STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6, 3) AS SUBSTRING, SUBSTRING(null, 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS, SUBSTRING(source, 6, null) AS NULL_LEN FROM TEST;" ], + "statements" : [ "CREATE STREAM TEST (K STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6, 3) AS SUBSTRING, SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS, SUBSTRING(source, 6, null) AS NULL_LEN FROM TEST;" ], "post" : { "sources" : [ { "name" : "OUTPUT", diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/5.5.0_1581572103658/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/5.5.0_1581572103658/plan.json index cca0b7567077..43fa45015096 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/5.5.0_1581572103658/plan.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/5.5.0_1581572103658/plan.json @@ -25,7 +25,7 @@ "queryPlan" : null }, { "@type" : "ksqlPlanV1", - "statementText" : "CREATE STREAM OUTPUT AS SELECT\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(null, 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES", "ddlCommand" : { "@type" : "createStreamV1", "sourceName" : "OUTPUT", @@ -79,7 +79,7 @@ "timestampColumn" : null, "sourceSchema" : "`ROWKEY` STRING KEY, `SOURCE` STRING" }, - "selectExpressions" : [ "SUBSTRING(SOURCE, 6, 3) AS SUBSTRING", "SUBSTRING(null, 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS", "SUBSTRING(SOURCE, 6, null) AS NULL_LEN" ] + "selectExpressions" : [ "SUBSTRING(SOURCE, 6, 3) AS SUBSTRING", "SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS", "SUBSTRING(SOURCE, 6, null) AS NULL_LEN" ] }, "formats" : { "keyFormat" : { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/5.5.0_1581572103658/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/5.5.0_1581572103658/spec.json index 57747e58c905..cd6d4936c06b 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/5.5.0_1581572103658/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/5.5.0_1581572103658/spec.json @@ -80,7 +80,7 @@ "replicas" : 1, "numPartitions" : 4 } ], - "statements" : [ "CREATE STREAM TEST (SOURCE STRING) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", "CREATE STREAM OUTPUT AS SELECT\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(null, 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES;" ], + "statements" : [ "CREATE STREAM TEST (SOURCE STRING) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", "CREATE STREAM OUTPUT AS SELECT\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES;" ], "post" : { "sources" : [ { "name" : "OUTPUT", diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/6.0.0_1588893945272/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/6.0.0_1588893945272/plan.json index 85dcd234422b..64ba148fdfc8 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/6.0.0_1588893945272/plan.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/6.0.0_1588893945272/plan.json @@ -18,7 +18,7 @@ } }, { "@type" : "ksqlPlanV1", - "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(null, 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES", "ddlCommand" : { "@type" : "createStreamV1", "sourceName" : "OUTPUT", @@ -62,7 +62,7 @@ }, "sourceSchema" : "`K` STRING KEY, `SOURCE` STRING" }, - "selectExpressions" : [ "SUBSTRING(SOURCE, 6, 3) AS SUBSTRING", "SUBSTRING(null, 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS", "SUBSTRING(SOURCE, 6, null) AS NULL_LEN" ] + "selectExpressions" : [ "SUBSTRING(SOURCE, 6, 3) AS SUBSTRING", "SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS", "SUBSTRING(SOURCE, 6, null) AS NULL_LEN" ] }, "formats" : { "keyFormat" : { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/6.0.0_1588893945272/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/6.0.0_1588893945272/spec.json index 692394c895c1..46a8b4192b69 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/6.0.0_1588893945272/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/6.0.0_1588893945272/spec.json @@ -80,7 +80,7 @@ "replicas" : 1, "numPartitions" : 4 } ], - "statements" : [ "CREATE STREAM TEST (K STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6, 3) AS SUBSTRING, SUBSTRING(null, 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS, SUBSTRING(source, 6, null) AS NULL_LEN FROM TEST;" ], + "statements" : [ "CREATE STREAM TEST (K STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6, 3) AS SUBSTRING, SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS, SUBSTRING(source, 6, null) AS NULL_LEN FROM TEST;" ], "post" : { "sources" : [ { "name" : "OUTPUT", diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/6.0.0_1589910891647/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/6.0.0_1589910891647/plan.json index 326a68b3673c..477dcb9443c3 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/6.0.0_1589910891647/plan.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/6.0.0_1589910891647/plan.json @@ -18,7 +18,7 @@ } }, { "@type" : "ksqlPlanV1", - "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(null, 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES", "ddlCommand" : { "@type" : "createStreamV1", "sourceName" : "OUTPUT", @@ -63,7 +63,7 @@ "sourceSchema" : "`K` STRING KEY, `SOURCE` STRING" }, "keyColumnNames" : [ "K" ], - "selectExpressions" : [ "SUBSTRING(SOURCE, 6, 3) AS SUBSTRING", "SUBSTRING(null, 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS", "SUBSTRING(SOURCE, 6, null) AS NULL_LEN" ] + "selectExpressions" : [ "SUBSTRING(SOURCE, 6, 3) AS SUBSTRING", "SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS", "SUBSTRING(SOURCE, 6, null) AS NULL_LEN" ] }, "formats" : { "keyFormat" : { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/6.0.0_1589910891647/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/6.0.0_1589910891647/spec.json index b227c390b922..8e197aca2e06 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/6.0.0_1589910891647/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_pos_and_length_-_PROTOBUF/6.0.0_1589910891647/spec.json @@ -80,7 +80,7 @@ "replicas" : 1, "numPartitions" : 4 } ], - "statements" : [ "CREATE STREAM TEST (K STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6, 3) AS SUBSTRING, SUBSTRING(null, 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS, SUBSTRING(source, 6, null) AS NULL_LEN FROM TEST;" ], + "statements" : [ "CREATE STREAM TEST (K STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6, 3) AS SUBSTRING, SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS, SUBSTRING(source, 6, null) AS NULL_LEN FROM TEST;" ], "post" : { "sources" : [ { "name" : "OUTPUT", diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_JSON/7.1.0_1626980269454/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_JSON/7.1.0_1626980269454/plan.json new file mode 100644 index 000000000000..59df894fd231 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_JSON/7.1.0_1626980269454/plan.json @@ -0,0 +1,154 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K STRING KEY, SOURCE STRING) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`K` STRING KEY, `SOURCE` STRING", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`K` STRING KEY, `SUBSTRING` STRING, `NULL_STR` STRING, `NULL_POS` STRING", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`K` STRING KEY, `SOURCE` STRING" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "SUBSTRING(SOURCE, 6) AS SUBSTRING", "SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_JSON/7.1.0_1626980269454/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_JSON/7.1.0_1626980269454/spec.json new file mode 100644 index 000000000000..e5a698271f8f --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_JSON/7.1.0_1626980269454/spec.json @@ -0,0 +1,126 @@ +{ + "version" : "7.1.0", + "timestamp" : 1626980269454, + "path" : "query-validation-tests/substring.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`K` STRING KEY, `SOURCE` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`K` STRING KEY, `SUBSTRING` STRING, `NULL_STR` STRING, `NULL_POS` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "do substring with strings with just pos - JSON", + "inputs" : [ { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : "some_string" + } + }, { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : "another" + } + }, { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : "short" + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : "string", + "NULL_STR" : null, + "NULL_POS" : null + } + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : "er", + "NULL_STR" : null, + "NULL_POS" : null + } + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : "", + "NULL_STR" : null, + "NULL_POS" : null + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (K STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6) AS SUBSTRING, SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `SUBSTRING` STRING, `NULL_STR` STRING, `NULL_POS` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `SOURCE` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_JSON/7.1.0_1626980269454/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_JSON/7.1.0_1626980269454/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_JSON/7.1.0_1626980269454/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_PROTOBUF/7.1.0_1626980273601/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_PROTOBUF/7.1.0_1626980273601/plan.json new file mode 100644 index 000000000000..59df894fd231 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_PROTOBUF/7.1.0_1626980273601/plan.json @@ -0,0 +1,154 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K STRING KEY, SOURCE STRING) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`K` STRING KEY, `SOURCE` STRING", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`K` STRING KEY, `SUBSTRING` STRING, `NULL_STR` STRING, `NULL_POS` STRING", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`K` STRING KEY, `SOURCE` STRING" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "SUBSTRING(SOURCE, 6) AS SUBSTRING", "SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_PROTOBUF/7.1.0_1626980273601/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_PROTOBUF/7.1.0_1626980273601/spec.json new file mode 100644 index 000000000000..43b71ea2f136 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_PROTOBUF/7.1.0_1626980273601/spec.json @@ -0,0 +1,126 @@ +{ + "version" : "7.1.0", + "timestamp" : 1626980273601, + "path" : "query-validation-tests/substring.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`K` STRING KEY, `SOURCE` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`K` STRING KEY, `SUBSTRING` STRING, `NULL_STR` STRING, `NULL_POS` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "do substring with strings with just pos - PROTOBUF", + "inputs" : [ { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : "some_string" + } + }, { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : "another" + } + }, { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : "short" + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : "string", + "NULL_STR" : null, + "NULL_POS" : null + } + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : "er", + "NULL_STR" : null, + "NULL_POS" : null + } + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : "", + "NULL_STR" : null, + "NULL_POS" : null + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (K STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6) AS SUBSTRING, SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `SUBSTRING` STRING, `NULL_STR` STRING, `NULL_POS` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `SOURCE` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_PROTOBUF/7.1.0_1626980273601/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_PROTOBUF/7.1.0_1626980273601/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_just_pos_-_PROTOBUF/7.1.0_1626980273601/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_JSON/7.1.0_1626980273797/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_JSON/7.1.0_1626980273797/plan.json new file mode 100644 index 000000000000..2ff7c156fa95 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_JSON/7.1.0_1626980273797/plan.json @@ -0,0 +1,154 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K STRING KEY, SOURCE STRING) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`K` STRING KEY, `SOURCE` STRING", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`K` STRING KEY, `SUBSTRING` STRING, `NULL_STR` STRING, `NULL_POS` STRING, `NULL_LEN` STRING", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`K` STRING KEY, `SOURCE` STRING" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "SUBSTRING(SOURCE, 6, 3) AS SUBSTRING", "SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS", "SUBSTRING(SOURCE, 6, null) AS NULL_LEN" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_JSON/7.1.0_1626980273797/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_JSON/7.1.0_1626980273797/spec.json new file mode 100644 index 000000000000..c24c27af807f --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_JSON/7.1.0_1626980273797/spec.json @@ -0,0 +1,129 @@ +{ + "version" : "7.1.0", + "timestamp" : 1626980273797, + "path" : "query-validation-tests/substring.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`K` STRING KEY, `SOURCE` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`K` STRING KEY, `SUBSTRING` STRING, `NULL_STR` STRING, `NULL_POS` STRING, `NULL_LEN` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "do substring with strings with pos and length - JSON", + "inputs" : [ { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : "some_string" + } + }, { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : "another" + } + }, { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : "short" + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : "str", + "NULL_STR" : null, + "NULL_POS" : null, + "NULL_LEN" : null + } + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : "er", + "NULL_STR" : null, + "NULL_POS" : null, + "NULL_LEN" : null + } + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : "", + "NULL_STR" : null, + "NULL_POS" : null, + "NULL_LEN" : null + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (K STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6, 3) AS SUBSTRING, SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS, SUBSTRING(source, 6, null) AS NULL_LEN FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `SUBSTRING` STRING, `NULL_STR` STRING, `NULL_POS` STRING, `NULL_LEN` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `SOURCE` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_JSON/7.1.0_1626980273797/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_JSON/7.1.0_1626980273797/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_JSON/7.1.0_1626980273797/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_PROTOBUF/7.1.0_1626980273891/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_PROTOBUF/7.1.0_1626980273891/plan.json new file mode 100644 index 000000000000..2ff7c156fa95 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_PROTOBUF/7.1.0_1626980273891/plan.json @@ -0,0 +1,154 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K STRING KEY, SOURCE STRING) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`K` STRING KEY, `SOURCE` STRING", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n SUBSTRING(TEST.SOURCE, 6, 3) SUBSTRING,\n SUBSTRING(CAST(null AS STRING), 1) NULL_STR,\n SUBSTRING(TEST.SOURCE, null) NULL_POS,\n SUBSTRING(TEST.SOURCE, 6, null) NULL_LEN\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`K` STRING KEY, `SUBSTRING` STRING, `NULL_STR` STRING, `NULL_POS` STRING, `NULL_LEN` STRING", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`K` STRING KEY, `SOURCE` STRING" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "SUBSTRING(SOURCE, 6, 3) AS SUBSTRING", "SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR", "SUBSTRING(SOURCE, null) AS NULL_POS", "SUBSTRING(SOURCE, 6, null) AS NULL_LEN" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_PROTOBUF/7.1.0_1626980273891/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_PROTOBUF/7.1.0_1626980273891/spec.json new file mode 100644 index 000000000000..f6a10c283b48 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_PROTOBUF/7.1.0_1626980273891/spec.json @@ -0,0 +1,129 @@ +{ + "version" : "7.1.0", + "timestamp" : 1626980273891, + "path" : "query-validation-tests/substring.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`K` STRING KEY, `SOURCE` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`K` STRING KEY, `SUBSTRING` STRING, `NULL_STR` STRING, `NULL_POS` STRING, `NULL_LEN` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "do substring with strings with pos and length - PROTOBUF", + "inputs" : [ { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : "some_string" + } + }, { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : "another" + } + }, { + "topic" : "test_topic", + "key" : null, + "value" : { + "source" : "short" + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : "str", + "NULL_STR" : null, + "NULL_POS" : null, + "NULL_LEN" : null + } + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : "er", + "NULL_STR" : null, + "NULL_POS" : null, + "NULL_LEN" : null + } + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "SUBSTRING" : "", + "NULL_STR" : null, + "NULL_POS" : null, + "NULL_LEN" : null + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (K STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6, 3) AS SUBSTRING, SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS, SUBSTRING(source, 6, null) AS NULL_LEN FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `SUBSTRING` STRING, `NULL_STR` STRING, `NULL_POS` STRING, `NULL_LEN` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `SOURCE` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_PROTOBUF/7.1.0_1626980273891/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_PROTOBUF/7.1.0_1626980273891/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/substring_-_do_substring_with_strings_with_pos_and_length_-_PROTOBUF/7.1.0_1626980273891/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/substring.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/substring.json index 46ebbe2b0b82..21fc5e00839f 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/substring.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/substring.json @@ -3,15 +3,16 @@ "Tests covering the use of the SUBSTRING function.", "NOTE: At present the SUBSTRING function has two modes: current and legacy.", " v5.0 and earlier only have legacy mode.", - " v5.1 and later have both current and legacy. New queries default to the current mode" + " v5.1 and later have both current and legacy. New queries default to the current mode", + "NOTE: Inputs for bytes tests are same inputs for strings but base64 encoded" ], "tests": [ { - "name": "do substring with just pos", + "name": "do substring with strings with just pos", "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (K STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6) AS SUBSTRING, SUBSTRING(null, 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS FROM TEST;" + "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6) AS SUBSTRING, SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS FROM TEST;" ], "inputs": [ {"topic": "test_topic", "value": {"source": "some_string"}}, @@ -25,11 +26,31 @@ ] }, { - "name": "do substring with pos and length", + "name": "do substring with bytes with just pos", + "format": ["JSON"], + "statements": [ + "CREATE STREAM TEST (K STRING KEY, source BYTES) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6) AS SUBSTRING, SUBSTRING(source, null) AS NULL_POS FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "value": {"source": "c29tZV9zdHJpbmc="}}, + {"topic": "test_topic", "value": {"source": "YW5vdGhlcg=="}}, + {"topic": "test_topic", "value": {"source": "c2hvcnQ="}}, + {"topic": "test_topic", "value": {"source": null}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"SUBSTRING":"c3RyaW5n", "NULL_POS":null}}, + {"topic": "OUTPUT", "value": {"SUBSTRING":"ZXI=", "NULL_POS":null}}, + {"topic": "OUTPUT", "value": {"SUBSTRING":"", "NULL_POS":null}}, + {"topic": "OUTPUT", "value": {"SUBSTRING":null, "NULL_POS":null}} + ] + }, + { + "name": "do substring with strings with pos and length", "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (K STRING KEY, source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6, 3) AS SUBSTRING, SUBSTRING(null, 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS, SUBSTRING(source, 6, null) AS NULL_LEN FROM TEST;" + "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6, 3) AS SUBSTRING, SUBSTRING(CAST(null AS STRING), 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS, SUBSTRING(source, 6, null) AS NULL_LEN FROM TEST;" ], "inputs": [ {"topic": "test_topic", "value": {"source": "some_string"}}, @@ -42,6 +63,26 @@ {"topic": "OUTPUT", "value": {"SUBSTRING":"", "NULL_STR":null, "NULL_POS":null, "NULL_LEN":null}} ] }, + { + "name": "do substring with bytes with pos and length", + "format": ["JSON"], + "statements": [ + "CREATE STREAM TEST (K STRING KEY, source BYTES) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT K, SUBSTRING(source, 6, 3) AS SUBSTRING, SUBSTRING(source, null) AS NULL_POS, SUBSTRING(source, 6, null) AS NULL_LEN FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "value": {"source": "c29tZV9zdHJpbmc="}}, + {"topic": "test_topic", "value": {"source": "YW5vdGhlcg=="}}, + {"topic": "test_topic", "value": {"source": "c2hvcnQ="}}, + {"topic": "test_topic", "value": {"source": null}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"SUBSTRING":"c3Ry", "NULL_POS":null, "NULL_LEN":null}}, + {"topic": "OUTPUT", "value": {"SUBSTRING":"ZXI=", "NULL_POS":null, "NULL_LEN":null}}, + {"topic": "OUTPUT", "value": {"SUBSTRING":"", "NULL_POS":null, "NULL_LEN":null}}, + {"topic": "OUTPUT", "value": {"SUBSTRING":null, "NULL_POS":null, "NULL_LEN":null}} + ] + }, { "name": "should default to current mode for new queries", "format": ["JSON", "PROTOBUF"], From 53b30eb03af3a5dd1c8db046d442a25b303bb2eb Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Thu, 22 Jul 2021 16:44:46 -0500 Subject: [PATCH 2/2] fix: fix test --- ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java | 4 ++-- .../java/io/confluent/ksql/function/udf/string/Substring.java | 2 +- .../io/confluent/ksql/function/udf/string/SubstringTest.java | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java index 60add3c48f5c..536223b5658b 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java @@ -1039,7 +1039,7 @@ public void shouldDescribeOverloadedScalarFunction() { assertThat(output, containsString( "Name : SUBSTRING\n" + "Author : Confluent\n" - + "Overview : Returns a substring of the passed in value.\n" + + "Overview : Returns the portion of the string or bytes passed in value.\n" )); assertThat(output, containsString( "Type : SCALAR\n" @@ -1051,7 +1051,7 @@ public void shouldDescribeOverloadedScalarFunction() { assertThat(output, containsString( "\tVariation : SUBSTRING(str VARCHAR, pos INT)\n" + "\tReturns : VARCHAR\n" - + "\tDescription : Returns a substring of str from pos to the end of str" + + "\tDescription : Returns the portion of str from pos to the end of str" )); assertThat(output, containsString( "\tstr : The source string.\n" diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/Substring.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/Substring.java index 64a8bace11e1..4224418f6dbd 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/Substring.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/Substring.java @@ -81,7 +81,7 @@ public ByteBuffer substring( @UdfParameter(description = "The base-one position to start from.") final Integer pos, @UdfParameter(description = "The length to extract.") final Integer length ) { - if (bytes == null || pos == null) { + if (bytes == null || pos == null || length == null) { return null; } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/SubstringTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/SubstringTest.java index b63463c1e394..4466f9b41a11 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/SubstringTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/SubstringTest.java @@ -45,8 +45,8 @@ public void shouldReturnNullOnStringNullValue() { public void shouldReturnNullOnBytesNullValue() { assertThat(udf.substring((ByteBuffer) null, 1), is(nullValue())); assertThat(udf.substring((ByteBuffer) null, 1, 1), is(nullValue())); - assertThat(udf.substring("some string", null, 1), is(nullValue())); - assertThat(udf.substring("some string", 1, null), is(nullValue())); + assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1}), null, 1), is(nullValue())); + assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1}), 1, null), is(nullValue())); } @Test