From e249c9ccd924bd573d5d9d5dac61cf46a87fd136 Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Tue, 27 Jul 2021 01:35:07 -0700 Subject: [PATCH] feat: enables BYTES for CONCAT and CONCAT_WS (#7876) --- .../ksqldb-reference/scalar-functions.md | 5 +- .../ksql/function/udf/string/Concat.java | 27 ++- .../ksql/function/udf/string/ConcatWS.java | 33 +++- .../ksql/function/udf/string/ConcatTest.java | 18 +- .../function/udf/string/ConcatWSTest.java | 49 ++++-- .../7.1.0_1627194645370/plan.json | 154 ++++++++++++++++++ .../7.1.0_1627194645370/spec.json | 110 +++++++++++++ .../7.1.0_1627194645370/topology | 13 ++ .../7.1.0_1627194645291/plan.json | 154 ++++++++++++++++++ .../7.1.0_1627194645291/spec.json | 144 ++++++++++++++++ .../7.1.0_1627194645291/topology | 13 ++ .../7.1.0_1627194645238/plan.json | 154 ++++++++++++++++++ .../7.1.0_1627194645238/spec.json | 144 ++++++++++++++++ .../7.1.0_1627194645238/topology | 13 ++ .../query-validation-tests/concat.json | 37 ++++- 15 files changed, 1046 insertions(+), 22 deletions(-) create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_-_bytes/7.1.0_1627194645370/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_-_bytes/7.1.0_1627194645370/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_-_bytes/7.1.0_1627194645370/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_bytes_-_JSON/7.1.0_1627194645291/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_bytes_-_JSON/7.1.0_1627194645291/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_bytes_-_JSON/7.1.0_1627194645291/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_string_-_JSON/7.1.0_1627194645238/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_string_-_JSON/7.1.0_1627194645238/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_string_-_JSON/7.1.0_1627194645238/topology diff --git a/docs/developer-guide/ksqldb-reference/scalar-functions.md b/docs/developer-guide/ksqldb-reference/scalar-functions.md index bac03ab93b09..18d6a50d0bc0 100644 --- a/docs/developer-guide/ksqldb-reference/scalar-functions.md +++ b/docs/developer-guide/ksqldb-reference/scalar-functions.md @@ -650,9 +650,10 @@ Since: - ```sql CONCAT(col1, col2, 'hello', ..., col-n) +CONCAT(bytes1, bytes2, ..., bytes-n) ``` -Concatenate two or more string expressions. Any input strings which evaluate to NULL are replaced with empty string in the output. +Concatenate two or more string or bytes expressions. Any inputs which evaluate to NULL are replaced with an empty string or bytes in the output. ### `CONCAT_WS` @@ -662,7 +663,7 @@ Since: 0.10.0 CONCAT_WS(separator, expr1, expr2, ...) ``` -Concatenates two or more string expressions, inserting a separator string between each. +Concatenates two or more string or bytes expressions, inserting a separator string or bytes between each. If the separator is NULL, this function returns NULL. Any expressions which evaluate to NULL are skipped. diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/Concat.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/Concat.java index c3523cf9bf60..0aa34862f496 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/Concat.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/Concat.java @@ -19,6 +19,7 @@ import io.confluent.ksql.function.udf.Udf; import io.confluent.ksql.function.udf.UdfDescription; import io.confluent.ksql.function.udf.UdfParameter; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Objects; import java.util.stream.Collectors; @@ -26,7 +27,7 @@ @UdfDescription( name = "concat", category = FunctionCategory.STRING, - description = "Concatenate an arbitrary number of string fields together") + description = "Concatenate an arbitrary number of string or bytes fields together") public class Concat { @Udf @@ -41,4 +42,28 @@ public String concat(@UdfParameter( .collect(Collectors.joining()); } + @Udf + public ByteBuffer concat(@UdfParameter( + description = "The bytes fields to concatenate") final ByteBuffer... inputs) { + if (inputs == null) { + return null; + } + + int capacity = 0; + + for (final ByteBuffer bytes : inputs) { + if (Objects.nonNull(bytes)) { + capacity += bytes.capacity(); + } + } + + final ByteBuffer concatenated = ByteBuffer.allocate(capacity); + Arrays.stream(inputs) + .filter(Objects::nonNull) + .forEachOrdered(bytes -> concatenated.put(bytes)); + + concatenated.rewind(); + return concatenated; + } + } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/ConcatWS.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/ConcatWS.java index 420c944ac563..58ae846fd3ea 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/ConcatWS.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/ConcatWS.java @@ -19,17 +19,22 @@ import io.confluent.ksql.function.udf.Udf; import io.confluent.ksql.function.udf.UdfDescription; import io.confluent.ksql.function.udf.UdfParameter; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @UdfDescription( name = "concat_ws", category = FunctionCategory.STRING, - description = "Concatenate several strings, inserting a separator string passed as the " + description = "Concatenate several strings or bytes, inserting a separator passed as the " + "first argument between each one.") public class ConcatWS { + private static final Concat CONCAT = new Concat(); + @Udf public String concatWS( @UdfParameter(description = "Separator string and values to join") final String... inputs) { @@ -47,4 +52,30 @@ public String concatWS( .filter(Objects::nonNull) .collect(Collectors.joining(separator)); } + + @Udf + public ByteBuffer concatWS( + @UdfParameter(description = "Separator and bytes values to join") + final ByteBuffer... inputs) { + if (inputs == null || inputs.length < 2) { + throw new KsqlFunctionException("Function Concat_WS expects at least two input arguments."); + } + + final ByteBuffer separator = inputs[0]; + if (separator == null) { + return null; + } + + final List concatInputs = new ArrayList<>(); + for (int i = 1; i < inputs.length; i++) { + if (Objects.nonNull(inputs[i])) { + if (concatInputs.size() != 0) { + concatInputs.add(separator.duplicate()); + } + concatInputs.add(inputs[i]); + } + } + + return CONCAT.concat(concatInputs.toArray(new ByteBuffer[0])); + } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/ConcatTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/ConcatTest.java index d02cb5592ce2..165c631f0287 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/ConcatTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/ConcatTest.java @@ -18,6 +18,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import java.nio.ByteBuffer; import org.junit.Before; import org.junit.Test; @@ -35,25 +36,36 @@ public void shouldConcatStrings() { assertThat(udf.concat("The", "Quick", "Brown", "Fox"), is("TheQuickBrownFox")); } + @Test + public void shouldConcatBytes() { + assertThat(udf.concat(ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2}), ByteBuffer.wrap(new byte[] {3})), + is(ByteBuffer.wrap(new byte[] {1, 2, 3}))); + } + @Test public void shouldIgnoreNullInputs() { assertThat(udf.concat(null, "this ", null, "should ", null, "work!", null), is("this should work!")); + assertThat(udf.concat(null, ByteBuffer.wrap(new byte[] {1}), null, ByteBuffer.wrap(new byte[] {2}), null), + is(ByteBuffer.wrap(new byte[] {1, 2}))); } @Test - public void shouldReturnEmptyStringIfAllInputsNull() { - assertThat(udf.concat(null, null), is("")); + public void shouldReturnEmptyIfAllInputsNull() { + assertThat(udf.concat((String) null, null), is("")); + assertThat(udf.concat((ByteBuffer) null, null), is(ByteBuffer.wrap(new byte[] {}))); } @Test public void shouldReturnSingleInput() { assertThat(udf.concat("singular"), is("singular")); + assertThat(udf.concat(ByteBuffer.wrap(new byte[] {2})), is(ByteBuffer.wrap(new byte[] {2}))); } @Test - public void shouldReturnEmptyStringForSingleNullInput() { + public void shouldReturnEmptyForSingleNullInput() { assertThat(udf.concat((String) null), is("")); + assertThat(udf.concat((ByteBuffer) null), is(ByteBuffer.wrap(new byte[] {}))); } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/ConcatWSTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/ConcatWSTest.java index f20b74e809cf..210d2ba724d9 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/ConcatWSTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/ConcatWSTest.java @@ -22,12 +22,14 @@ import io.confluent.ksql.function.KsqlFunctionException; import io.confluent.ksql.util.KsqlException; +import java.nio.ByteBuffer; import org.junit.Before; import org.junit.Test; public class ConcatWSTest { private ConcatWS udf; + private static final ByteBuffer EMPTY_BYTES = ByteBuffer.wrap(new byte[] {}); @Before public void setUp() { @@ -39,6 +41,12 @@ public void shouldConcatStrings() { assertThat(udf.concatWS(" ", "The", "Quick", "Brown", "Fox"), is("The Quick Brown Fox")); } + @Test + public void shouldConcatBytes() { + assertThat(udf.concatWS(ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2}), ByteBuffer.wrap(new byte[] {3})), + is(ByteBuffer.wrap(new byte[] {2, 1, 3}))); + } + @Test public void shouldConcatLongerSeparator() { final String result = udf.concatWS("SEP", "foo", "bar", "baz"); @@ -48,28 +56,30 @@ public void shouldConcatLongerSeparator() { @Test public void shouldReturnSingleInputUnchanged() { assertThat(udf.concatWS("SEP", "singular"), is("singular")); + assertThat(udf.concatWS(ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2})), is(ByteBuffer.wrap(new byte[] {2}))); } @Test public void shouldReturnNullForNullSeparator() { - final Object result = udf.concatWS(null, "foo", "bar"); - assertThat(result, is(nullValue())); + assertThat(udf.concatWS(null, "foo", "bar"), is(nullValue())); + assertThat(udf.concatWS(null, ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2})), is(nullValue())); } @Test - public void shouldReturnEmptyStringIfAllInputsNull() { - final Object result = udf.concatWS("SEP", null, null); - assertThat(result, is("")); + public void shouldReturnEmptyIfAllInputsNull() { + assertThat(udf.concatWS("SEP", null, null), is("")); + assertThat(udf.concatWS(ByteBuffer.wrap(new byte[] {2}), null, null), is(EMPTY_BYTES)); } @Test public void shouldSkipAnyNullInputs() { - final Object result = udf.concatWS("SEP", "foo", null, "bar"); - assertThat(result, is("fooSEPbar")); + assertThat(udf.concatWS("SEP", "foo", null, "bar"), is("fooSEPbar")); + assertThat(udf.concatWS(ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2}), null, ByteBuffer.wrap(new byte[] {3})), + is(ByteBuffer.wrap(new byte[] {2, 1, 3}))); } @Test - public void shouldFailIfOnlySeparatorInput() { + public void shouldFailIfOnlySeparatorStringInput() { // When: final KsqlException e = assertThrows(KsqlFunctionException.class, () -> udf.concatWS("SEP")); @@ -77,22 +87,33 @@ public void shouldFailIfOnlySeparatorInput() { assertThat(e.getMessage(), containsString("expects at least two input arguments")); } + @Test + public void shouldFailIfOnlySeparatorBytesInput() { + // When: + final KsqlException e = assertThrows(KsqlFunctionException.class, () -> udf.concatWS(ByteBuffer.wrap(new byte[] {3}))); + + // Then: + assertThat(e.getMessage(), containsString("expects at least two input arguments")); + } + @Test public void shouldWorkWithEmptySeparator() { - final Object result = udf.concatWS("", "foo", "bar"); - assertThat(result, is("foobar")); + assertThat(udf.concatWS("", "foo", "bar"), is("foobar")); + assertThat(udf.concatWS(EMPTY_BYTES, ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2})), + is(ByteBuffer.wrap(new byte[] {1, 2}))); } @Test public void shouldHandleEmptyInputs() { - final Object result = udf.concatWS("SEP", "foo", "", "bar"); - assertThat(result, is("fooSEPSEPbar")); + assertThat(udf.concatWS("SEP", "foo", "", "bar"), is("fooSEPSEPbar")); + assertThat(udf.concatWS(ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2}), EMPTY_BYTES, ByteBuffer.wrap(new byte[] {3})), + is(ByteBuffer.wrap(new byte[] {2, 1, 1, 3}))); } @Test public void shouldReturnEmptyIfEverythingEmpty() { - final Object result = udf.concatWS("", "", ""); - assertThat(result, is("")); + assertThat(udf.concatWS("", "", ""), is("")); + assertThat(udf.concatWS(EMPTY_BYTES, EMPTY_BYTES, EMPTY_BYTES), is(EMPTY_BYTES)); } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_-_bytes/7.1.0_1627194645370/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_-_bytes/7.1.0_1627194645370/plan.json new file mode 100644 index 000000000000..f532da3ed062 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_-_bytes/7.1.0_1627194645370/plan.json @@ -0,0 +1,154 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID STRING KEY, C1 BYTES, C2 BYTES) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` STRING KEY, `C1` BYTES, `C2` BYTES", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.ID ID,\n CONCAT(TEST.C1, null, TEST.C2) THING\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` STRING KEY, `THING` BYTES", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` STRING KEY, `C1` BYTES, `C2` BYTES" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "CONCAT(C1, null, C2) AS THING" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_-_bytes/7.1.0_1627194645370/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_-_bytes/7.1.0_1627194645370/spec.json new file mode 100644 index 000000000000..1b320de07b1a --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_-_bytes/7.1.0_1627194645370/spec.json @@ -0,0 +1,110 @@ +{ + "version" : "7.1.0", + "timestamp" : 1627194645370, + "path" : "query-validation-tests/concat.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`ID` STRING KEY, `C1` BYTES, `C2` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`ID` STRING KEY, `THING` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "concat - bytes", + "inputs" : [ { + "topic" : "test_topic", + "key" : null, + "value" : { + "C1" : "eWVz", + "C2" : "bm8=" + } + }, { + "topic" : "test_topic", + "key" : null, + "value" : { + "C1" : "", + "C2" : "bm8=" + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "THING" : "eWVzbm8=" + } + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "THING" : "bm8=" + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID STRING KEY, C1 BYTES, C2 BYTES) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT ID, CONCAT(C1, NULL, C2) AS THING FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `THING` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `C1` BYTES, `C2` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_-_bytes/7.1.0_1627194645370/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_-_bytes/7.1.0_1627194645370/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_-_bytes/7.1.0_1627194645370/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_bytes_-_JSON/7.1.0_1627194645291/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_bytes_-_JSON/7.1.0_1627194645291/plan.json new file mode 100644 index 000000000000..629b48c53042 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_bytes_-_JSON/7.1.0_1627194645291/plan.json @@ -0,0 +1,154 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ID BIGINT KEY, S1 BYTES, C1 BYTES, C2 BYTES, C3 BYTES) WITH (KAFKA_TOPIC='input_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ID` BIGINT KEY, `S1` BYTES, `C1` BYTES, `C2` BYTES, `C3` BYTES", + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n CONCAT_WS(INPUT.S1, INPUT.C1, INPUT.C2, null, INPUT.C3) COMBINED\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` BIGINT KEY, `COMBINED` BYTES", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` BIGINT KEY, `S1` BYTES, `C1` BYTES, `C2` BYTES, `C3` BYTES" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "CONCAT_WS(S1, C1, C2, null, C3) AS COMBINED" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_bytes_-_JSON/7.1.0_1627194645291/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_bytes_-_JSON/7.1.0_1627194645291/spec.json new file mode 100644 index 000000000000..1e69c892f7d6 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_bytes_-_JSON/7.1.0_1627194645291/spec.json @@ -0,0 +1,144 @@ +{ + "version" : "7.1.0", + "timestamp" : 1627194645291, + "path" : "query-validation-tests/concat.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`ID` BIGINT KEY, `S1` BYTES, `C1` BYTES, `C2` BYTES, `C3` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`ID` BIGINT KEY, `COMBINED` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "concat_ws - bytes - JSON", + "inputs" : [ { + "topic" : "input_topic", + "key" : 1, + "value" : { + "S1" : "YQ==", + "C1" : "Yg==", + "C2" : "eWVz", + "C3" : "bm8=" + } + }, { + "topic" : "input_topic", + "key" : 2, + "value" : { + "S1" : "YQ==", + "C1" : "Yg==", + "C2" : "", + "C3" : "bm8=" + } + }, { + "topic" : "input_topic", + "key" : 3, + "value" : { + "S1" : "YQ==", + "C1" : null, + "C2" : null, + "C3" : null + } + }, { + "topic" : "input_topic", + "key" : 4, + "value" : { + "S1" : null, + "C1" : "Yg==", + "C2" : "eWVz", + "C3" : "bm8=" + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 1, + "value" : { + "COMBINED" : "YmF5ZXNhbm8=" + } + }, { + "topic" : "OUTPUT", + "key" : 2, + "value" : { + "COMBINED" : "YmFhbm8=" + } + }, { + "topic" : "OUTPUT", + "key" : 3, + "value" : { + "COMBINED" : "" + } + }, { + "topic" : "OUTPUT", + "key" : 4, + "value" : { + "COMBINED" : null + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "input_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (ID BIGINT KEY, S1 BYTES, C1 BYTES, C2 BYTES, C3 BYTES) WITH (kafka_topic='input_topic',value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT ID, CONCAT_WS(S1, C1, C2, NULL, C3) AS COMBINED FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`ID` BIGINT KEY, `S1` BYTES, `C1` BYTES, `C2` BYTES, `C3` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`ID` BIGINT KEY, `COMBINED` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "input_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_bytes_-_JSON/7.1.0_1627194645291/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_bytes_-_JSON/7.1.0_1627194645291/topology new file mode 100644 index 000000000000..12f8f6574002 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_bytes_-_JSON/7.1.0_1627194645291/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_string_-_JSON/7.1.0_1627194645238/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_string_-_JSON/7.1.0_1627194645238/plan.json new file mode 100644 index 000000000000..f61eecb28e78 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_string_-_JSON/7.1.0_1627194645238/plan.json @@ -0,0 +1,154 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ID BIGINT KEY, S1 STRING, C1 STRING, C2 STRING, C3 STRING) WITH (KAFKA_TOPIC='input_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ID` BIGINT KEY, `S1` STRING, `C1` STRING, `C2` STRING, `C3` STRING", + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n CONCAT_WS(INPUT.S1, INPUT.C1, INPUT.C2, INPUT.C3, null, 'literal') COMBINED\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` BIGINT KEY, `COMBINED` STRING", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` BIGINT KEY, `S1` STRING, `C1` STRING, `C2` STRING, `C3` STRING" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "CONCAT_WS(S1, C1, C2, C3, null, 'literal') AS COMBINED" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_string_-_JSON/7.1.0_1627194645238/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_string_-_JSON/7.1.0_1627194645238/spec.json new file mode 100644 index 000000000000..6685d75d0251 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_string_-_JSON/7.1.0_1627194645238/spec.json @@ -0,0 +1,144 @@ +{ + "version" : "7.1.0", + "timestamp" : 1627194645238, + "path" : "query-validation-tests/concat.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`ID` BIGINT KEY, `S1` STRING, `C1` STRING, `C2` STRING, `C3` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`ID` BIGINT KEY, `COMBINED` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "concat_ws - string - JSON", + "inputs" : [ { + "topic" : "input_topic", + "key" : 1, + "value" : { + "S1" : "SEP", + "C1" : "foo", + "C2" : "bar", + "C3" : "baz" + } + }, { + "topic" : "input_topic", + "key" : 2, + "value" : { + "S1" : "SEP", + "C1" : "foo", + "C2" : null, + "C3" : "baz" + } + }, { + "topic" : "input_topic", + "key" : 3, + "value" : { + "S1" : "SEP", + "C1" : null, + "C2" : null, + "C3" : null + } + }, { + "topic" : "input_topic", + "key" : 4, + "value" : { + "S1" : null, + "C1" : "foo", + "C2" : "bar", + "C3" : "baz" + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 1, + "value" : { + "COMBINED" : "fooSEPbarSEPbazSEPliteral" + } + }, { + "topic" : "OUTPUT", + "key" : 2, + "value" : { + "COMBINED" : "fooSEPbazSEPliteral" + } + }, { + "topic" : "OUTPUT", + "key" : 3, + "value" : { + "COMBINED" : "literal" + } + }, { + "topic" : "OUTPUT", + "key" : 4, + "value" : { + "COMBINED" : null + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "input_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (ID BIGINT KEY, S1 STRING, C1 STRING, C2 STRING, C3 STRING) WITH (kafka_topic='input_topic',value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT ID, CONCAT_WS(S1, C1, C2, C3, NULL, 'literal') AS COMBINED FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`ID` BIGINT KEY, `S1` STRING, `C1` STRING, `C2` STRING, `C3` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`ID` BIGINT KEY, `COMBINED` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "input_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_string_-_JSON/7.1.0_1627194645238/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_string_-_JSON/7.1.0_1627194645238/topology new file mode 100644 index 000000000000..12f8f6574002 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/concat_-_concat_ws_-_string_-_JSON/7.1.0_1627194645238/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/concat.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/concat.json index f4f86a81652c..b6416f60a42b 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/concat.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/concat.json @@ -4,7 +4,7 @@ ], "tests": [ { - "name": "concat_ws", + "name": "concat_ws - string", "format": ["JSON"], "statements": [ "CREATE STREAM INPUT (ID BIGINT KEY, S1 STRING, C1 STRING, C2 STRING, C3 STRING) WITH (kafka_topic='input_topic',value_format='{FORMAT}');", @@ -23,6 +23,41 @@ {"topic": "OUTPUT", "key": 4, "value": {"COMBINED": null}} ] }, + { + "name": "concat_ws - bytes", + "format": ["JSON"], + "statements": [ + "CREATE STREAM INPUT (ID BIGINT KEY, S1 BYTES, C1 BYTES, C2 BYTES, C3 BYTES) WITH (kafka_topic='input_topic',value_format='{FORMAT}');", + "CREATE STREAM OUTPUT AS SELECT ID, CONCAT_WS(S1, C1, C2, NULL, C3) AS COMBINED FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "key": 1, "value": {"S1": "YQ==", "C1": "Yg==", "C2": "eWVz", "C3": "bm8="}}, + {"topic": "input_topic", "key": 2, "value": {"S1": "YQ==", "C1": "Yg==", "C2": "", "C3": "bm8="}}, + {"topic": "input_topic", "key": 3, "value": {"S1": "YQ==", "C1": null, "C2": null, "C3": null}}, + {"topic": "input_topic", "key": 4, "value": {"S1": null, "C1": "Yg==", "C2": "eWVz", "C3": "bm8="}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"COMBINED": "YmF5ZXNhbm8="}}, + {"topic": "OUTPUT", "key": 2, "value": {"COMBINED": "YmFhbm8="}}, + {"topic": "OUTPUT", "key": 3, "value": {"COMBINED": ""}}, + {"topic": "OUTPUT", "key": 4, "value": {"COMBINED": null}} + ] + }, + { + "name": "concat - bytes", + "statements": [ + "CREATE STREAM TEST (ID STRING KEY, C1 BYTES, C2 BYTES) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT ID, CONCAT(C1, NULL, C2) AS THING FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "value": {"C1": "eWVz", "C2": "bm8="}}, + {"topic": "test_topic", "value": {"C1": "", "C2": "bm8="}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"THING":"eWVzbm8="}}, + {"topic": "OUTPUT", "value": {"THING":"bm8="}} + ] + }, { "name": "concat fields using CONCAT", "statements": [