From 866798e5643b0219bfe3f200dea2df8299fdd31d Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Wed, 23 Jun 2021 16:22:15 -0700 Subject: [PATCH 1/4] feat: add PARSE_TIME and FORMAT_TIME functions --- .../ksqldb-reference/scalar-functions.md | 27 ++++ .../ksql/function/types/ParamTypes.java | 2 + .../io/confluent/ksql/function/UdafTypes.java | 2 + .../function/udf/datetime/FormatTime.java | 69 ++++++++ .../ksql/function/udf/datetime/ParseTime.java | 63 ++++++++ .../function/udf/datetime/FormatTimeTest.java | 93 +++++++++++ .../function/udf/datetime/ParseTimeTest.java | 108 +++++++++++++ .../ksql/execution/function/UdfUtil.java | 2 + .../planned/PlannedTestGeneratorTest.java | 2 +- .../7.0.0_1624490278830/plan.json | 153 ++++++++++++++++++ .../7.0.0_1624490278830/spec.json | 135 ++++++++++++++++ .../7.0.0_1624490278830/topology | 13 ++ .../7.0.0_1624490348282/plan.json | 153 ++++++++++++++++++ .../7.0.0_1624490348282/spec.json | 108 +++++++++++++ .../7.0.0_1624490348282/topology | 13 ++ .../query-validation-tests/format-time.json | 26 +++ .../query-validation-tests/parse-time.json | 21 +++ .../ksql/schema/ksql/JavaToSqlConverter.java | 2 + 18 files changed, 991 insertions(+), 1 deletion(-) create mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/FormatTime.java create mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/ParseTime.java create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/FormatTimeTest.java create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/ParseTimeTest.java create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/format-time_-_time_to_string/7.0.0_1624490278830/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/format-time_-_time_to_string/7.0.0_1624490278830/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/format-time_-_time_to_string/7.0.0_1624490278830/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/parse-time_-_string_to_time/7.0.0_1624490348282/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/parse-time_-_string_to_time/7.0.0_1624490348282/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/parse-time_-_string_to_time/7.0.0_1624490348282/topology create mode 100644 ksqldb-functional-tests/src/test/resources/query-validation-tests/format-time.json create mode 100644 ksqldb-functional-tests/src/test/resources/query-validation-tests/parse-time.json diff --git a/docs/developer-guide/ksqldb-reference/scalar-functions.md b/docs/developer-guide/ksqldb-reference/scalar-functions.md index f633daac5516..4621031e1fc9 100644 --- a/docs/developer-guide/ksqldb-reference/scalar-functions.md +++ b/docs/developer-guide/ksqldb-reference/scalar-functions.md @@ -1199,6 +1199,33 @@ TIMEZONE is an optional parameter and it is a `java.util.TimeZone` ID format, fo "America/Los_Angeles", "PDT", "Europe/London". For more information on timestamp formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). +### `FORMAT_TIME` + +Since: 0.20 + +```sql +FORMAT_TIME(timestamp, 'HH:mm:ss.SSS') +``` + +Converts a TIME value into the string representation of the timestamp in the given format. +Single quotes in the time format can be escaped with two successive single quotes, `''`, for +example: `'''T''HH:mm:ssX'`. + +For more information on time formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). + +### `PARSE_TIME` + +Since: 0.20 + +```sql +PARSE_TIME(col1, 'HH:mm:ss.SSS') +``` + +Converts a string value in the given format into a TIME value. Single quotes in the time +format can be escaped with two successive single quotes, `''`, for example: `'''T''HH:mm:ssX'`. + +For more information on time formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). + ### `CONVERT_TZ` ```sql diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/function/types/ParamTypes.java b/ksqldb-common/src/main/java/io/confluent/ksql/function/types/ParamTypes.java index 2fc55a052273..02f101ee8ef3 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/function/types/ParamTypes.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/function/types/ParamTypes.java @@ -156,6 +156,8 @@ private static boolean isPrimitiveMatch( || base == SqlBaseType.BOOLEAN && declared instanceof BooleanType || base == SqlBaseType.DOUBLE && declared instanceof DoubleType || base == SqlBaseType.DECIMAL && declared instanceof DecimalType + || base == SqlBaseType.TIME && declared instanceof TimeType + || base == SqlBaseType.DATE && declared instanceof DateType || base == SqlBaseType.TIMESTAMP && declared instanceof TimestampType || allowCast && base.canImplicitlyCast(functionToSqlBaseConverter().toBaseType(declared)); // CHECKSTYLE_RULES.ON: BooleanExpressionComplexity diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafTypes.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafTypes.java index ac9c186f2e16..a206a0eab702 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafTypes.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafTypes.java @@ -30,6 +30,7 @@ import java.lang.reflect.Type; import java.lang.reflect.TypeVariable; import java.math.BigDecimal; +import java.sql.Time; import java.sql.Timestamp; import java.util.List; import java.util.Map; @@ -56,6 +57,7 @@ class UdafTypes { .add(Struct.class) .add(List.class) .add(Map.class) + .add(Time.class) .add(Timestamp.class) .add(TimeUnit.class) .add(Function.class) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/FormatTime.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/FormatTime.java new file mode 100644 index 000000000000..26a19792d417 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/FormatTime.java @@ -0,0 +1,69 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.datetime; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import io.confluent.ksql.function.FunctionCategory; +import io.confluent.ksql.function.KsqlFunctionException; +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import io.confluent.ksql.util.KsqlConstants; +import java.sql.Time; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.ExecutionException; + +@UdfDescription( + name = "format_time", + category = FunctionCategory.DATE_TIME, + author = KsqlConstants.CONFLUENT_AUTHOR, + description = "Converts a TIME value into the string representation of the time" + + " in the given format." +) +public class FormatTime { + + private final LoadingCache formatters = + CacheBuilder.newBuilder() + .maximumSize(1000) + .build(CacheLoader.from(DateTimeFormatter::ofPattern)); + + @Udf(description = "Converts a TIME value into the" + + " string representation of the time in the given format." + + " The format pattern should be in the format expected" + + " by java.time.format.DateTimeFormatter") + public String formatTime( + @UdfParameter( + description = "TIME value.") final Time time, + @UdfParameter( + description = "The format pattern should be in the format expected by" + + " java.time.format.DateTimeFormatter.") final String formatPattern) { + if (time == null) { + return null; + } + try { + final DateTimeFormatter formatter = formatters.get(formatPattern); + return LocalTime.ofNanoOfDay(time.getTime() * 1000000).format(formatter); + } catch (ExecutionException | RuntimeException e) { + throw new KsqlFunctionException("Failed to format time " + + LocalTime.ofNanoOfDay(time.getTime() * 1000000) + + " with formatter '" + formatPattern + + "': " + e.getMessage(), e); + } + } +} \ No newline at end of file diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/ParseTime.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/ParseTime.java new file mode 100644 index 000000000000..a626243a38e4 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/ParseTime.java @@ -0,0 +1,63 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.datetime; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import io.confluent.ksql.function.FunctionCategory; +import io.confluent.ksql.function.KsqlFunctionException; +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import io.confluent.ksql.util.KsqlConstants; +import java.sql.Time; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.ExecutionException; + +@UdfDescription( + name = "parse_time", + category = FunctionCategory.DATE_TIME, + author = KsqlConstants.CONFLUENT_AUTHOR, + description = "Converts a string representation of a time in the given format" + + " into a TIME value." +) +public class ParseTime { + + private final LoadingCache formatters = + CacheBuilder.newBuilder() + .maximumSize(1000) + .build(CacheLoader.from(DateTimeFormatter::ofPattern)); + + @Udf(description = "Converts a string representation of a time in the given format" + + " into the TIME value.") + public Time parseTime( + @UdfParameter( + description = "The string representation of a time.") final String formattedTime, + @UdfParameter( + description = "The format pattern should be in the format expected by" + + " java.time.format.DateTimeFormatter.") final String formatPattern) { + try { + final DateTimeFormatter formatter = formatters.get(formatPattern); + return new Time(LocalTime.parse(formattedTime, formatter).toNanoOfDay() / 1000000); + } catch (ExecutionException | RuntimeException e) { + throw new KsqlFunctionException("Failed to parse time '" + formattedTime + + "' with formatter '" + formatPattern + + "': " + e.getMessage(), e); + } + } +} \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/FormatTimeTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/FormatTimeTest.java new file mode 100644 index 000000000000..dd53f306cdf8 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/FormatTimeTest.java @@ -0,0 +1,93 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.datetime; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.fail; + +import io.confluent.ksql.function.KsqlFunctionException; +import java.sql.Time; +import java.util.stream.IntStream; +import org.junit.Before; +import org.junit.Test; + +public class FormatTimeTest { + + private FormatTime udf; + + @Before + public void setUp() { + udf = new FormatTime(); + } + + @Test + public void shouldConvertTimeToString() { + // When: + final String result = udf.formatTime(new Time(65000), "HHmmss"); + + // Then: + assertThat(result, is("000105")); + } + + @Test + public void shouldSupportEmbeddedChars() { + // When: + final Object result = udf.formatTime(new Time(65000), "HH:mm:ss.SSS'Fred'"); + + // Then: + assertThat(result, is("00:01:05.000Fred")); + } + + @Test + public void shouldThrowIfFormatInvalid() { + // When: + final Exception e = assertThrows( + KsqlFunctionException.class, + () -> udf.formatTime(new Time(65000), "invalid") + ); + + // Then: + assertThat(e.getMessage(), containsString("Failed to format time 00:01:05 with formatter 'invalid'")); + } + + @Test + public void shouldByThreadSafeAndWorkWithManyDifferentFormatters() { + IntStream.range(0, 10_000) + .parallel() + .forEach(idx -> { + try { + final String pattern = "HH:mm:ss'X" + idx + "'"; + final String result = udf.formatTime(new Time(65000), pattern); + assertThat(result, is("00:01:05X" + idx)); + } catch (final Exception e) { + fail(e.getMessage()); + } + }); + } + + @Test + public void shoudlReturnNull() { + // When: + final Object result = udf.formatTime(null, "HH:mm:ss.SSS"); + + // Then: + assertNull(result); + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/ParseTimeTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/ParseTimeTest.java new file mode 100644 index 000000000000..d9ccb7be7059 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/ParseTimeTest.java @@ -0,0 +1,108 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.datetime; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.fail; + +import io.confluent.ksql.function.KsqlFunctionException; +import java.sql.Time; +import java.util.stream.IntStream; +import org.junit.Before; +import org.junit.Test; + +public class ParseTimeTest { + + private ParseTime udf; + + @Before + public void setUp() { + udf = new ParseTime(); + } + + @Test + public void shouldConvertStringToDate() { + // When: + final Time result = udf.parseTime("000105", "HHmmss"); + + // Then: + assertThat(result.getTime(), is(65000L)); + } + + @Test + public void shouldSupportEmbeddedChars() { + // When: + final Time result = udf.parseTime("000105.000Fred", "HHmmss.SSS'Fred'"); + + // Then: + assertThat(result.getTime(), is(65000L)); + } + + @Test + public void shouldThrowIfFormatInvalid() { + // When: + final Exception e = assertThrows( + KsqlFunctionException.class, + () -> udf.parseTime("000105", "invalid") + ); + + // Then: + assertThat(e.getMessage(), containsString("Failed to parse time '000105' with formatter 'invalid'")); + } + + @Test + public void shouldThrowIfParseFails() { + // When: + final Exception e = assertThrows( + KsqlFunctionException.class, + () -> udf.parseTime("invalid", "HHmmss") + ); + + // Then: + assertThat(e.getMessage(), containsString("Failed to parse time 'invalid' with formatter 'HHmmss'")); + } + + @Test + public void shouldThrowOnEmptyString() { + // When: + final Exception e = assertThrows( + KsqlFunctionException.class, + () -> udf.parseTime("", "HHmmss") + ); + + // Then: + assertThat(e.getMessage(), containsString("Failed to parse time '' with formatter 'HHmmss'")); + } + + @Test + public void shouldBeThreadSafeAndWorkWithManyDifferentFormatters() { + IntStream.range(0, 10_000) + .parallel() + .forEach(idx -> { + try { + final String sourceDate = "000105X" + idx; + final String pattern = "HHmmss'X" + idx + "'"; + final Time result = udf.parseTime(sourceDate, pattern); + assertThat(result.getTime(), is(65000L)); + } catch (final Exception e) { + fail(e.getMessage()); + } + }); + } +} diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/function/UdfUtil.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/function/UdfUtil.java index 4efb63d091e1..f4c5df65ee84 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/function/UdfUtil.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/function/UdfUtil.java @@ -32,6 +32,7 @@ import java.lang.reflect.Type; import java.lang.reflect.TypeVariable; import java.math.BigDecimal; +import java.sql.Time; import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; @@ -56,6 +57,7 @@ public final class UdfUtil { .put(double.class, ParamTypes.DOUBLE) .put(BigDecimal.class, ParamTypes.DECIMAL) .put(Timestamp.class, ParamTypes.TIMESTAMP) + .put(Time.class, ParamTypes.TIME) .put(TimeUnit.class, ParamTypes.INTERVALUNIT) .build(); diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/planned/PlannedTestGeneratorTest.java b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/planned/PlannedTestGeneratorTest.java index 71712a7c7c5b..ca30a2c42efe 100644 --- a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/planned/PlannedTestGeneratorTest.java +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/planned/PlannedTestGeneratorTest.java @@ -35,7 +35,7 @@ public class PlannedTestGeneratorTest { * with your change. Otherwise, {@link PlannedTestsUpToDateTest} fill fail if there are missing * or changed query plans. */ - @Ignore("Comment me out to regenerate the historic plans") + // @Ignore("Comment me out to regenerate the historic plans") @Test public void manuallyGeneratePlans() { PlannedTestGenerator.generatePlans(QueryTranslationTest.findTestCases() diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/format-time_-_time_to_string/7.0.0_1624490278830/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/format-time_-_time_to_string/7.0.0_1624490278830/plan.json new file mode 100644 index 000000000000..d571da095d28 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/format-time_-_time_to_string/7.0.0_1624490278830/plan.json @@ -0,0 +1,153 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID BIGINT KEY, START_TIME TIME, DATE_FORMAT STRING) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` BIGINT KEY, `START_TIME` TIME, `DATE_FORMAT` STRING", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TIME_STREAM AS SELECT\n TEST.ID ID,\n FORMAT_TIME(TEST.START_TIME, TEST.DATE_FORMAT) CUSTOM_FORMATTED_START_TIME\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TIME_STREAM", + "schema" : "`ID` BIGINT KEY, `CUSTOM_FORMATTED_START_TIME` STRING", + "topicName" : "TIME_STREAM", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TIME_STREAM", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TIME_STREAM" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` BIGINT KEY, `START_TIME` TIME, `DATE_FORMAT` STRING" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "FORMAT_TIME(START_TIME, DATE_FORMAT) AS CUSTOM_FORMATTED_START_TIME" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "TIME_STREAM" + }, + "queryId" : "CSAS_TIME_STREAM_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/format-time_-_time_to_string/7.0.0_1624490278830/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/format-time_-_time_to_string/7.0.0_1624490278830/spec.json new file mode 100644 index 000000000000..aa8358237295 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/format-time_-_time_to_string/7.0.0_1624490278830/spec.json @@ -0,0 +1,135 @@ +{ + "version" : "7.0.0", + "timestamp" : 1624490278830, + "path" : "query-validation-tests/format-time.json", + "schemas" : { + "CSAS_TIME_STREAM_0.KsqlTopic.Source" : { + "schema" : "`ID` BIGINT KEY, `START_TIME` TIME, `DATE_FORMAT` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_TIME_STREAM_0.TIME_STREAM" : { + "schema" : "`ID` BIGINT KEY, `CUSTOM_FORMATTED_START_TIME` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "time to string", + "inputs" : [ { + "topic" : "test_topic", + "key" : 1, + "value" : { + "START_TIME" : 65000, + "DATE_FORMAT" : "HH_mm_ss_SSS" + } + }, { + "topic" : "test_topic", + "key" : 2, + "value" : { + "START_TIME" : 1234567, + "DATE_FORMAT" : "HHmmss'AAAAA'" + } + }, { + "topic" : "test_topic", + "key" : 3, + "value" : { + "START_TIME" : 0, + "DATE_FORMAT" : "HH:mm:ss" + } + }, { + "topic" : "test_topic", + "key" : 5, + "value" : { + "DATE_FORMAT" : "HH:mm:ss" + } + } ], + "outputs" : [ { + "topic" : "TIME_STREAM", + "key" : 1, + "value" : { + "CUSTOM_FORMATTED_START_TIME" : "00_01_05_000" + } + }, { + "topic" : "TIME_STREAM", + "key" : 2, + "value" : { + "CUSTOM_FORMATTED_START_TIME" : "002034AAAAA" + } + }, { + "topic" : "TIME_STREAM", + "key" : 3, + "value" : { + "CUSTOM_FORMATTED_START_TIME" : "00:00:00" + } + }, { + "topic" : "TIME_STREAM", + "key" : 5, + "value" : { + "CUSTOM_FORMATTED_START_TIME" : null + } + } ], + "topics" : [ { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "TIME_STREAM", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID bigint KEY, START_TIME time, DATE_FORMAT varchar) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM TIME_STREAM AS select ID, format_time(START_TIME, DATE_FORMAT) as CUSTOM_FORMATTED_START_TIME from test;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` BIGINT KEY, `START_TIME` TIME, `DATE_FORMAT` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TIME_STREAM", + "type" : "STREAM", + "schema" : "`ID` BIGINT KEY, `CUSTOM_FORMATTED_START_TIME` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "TIME_STREAM", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/format-time_-_time_to_string/7.0.0_1624490278830/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/format-time_-_time_to_string/7.0.0_1624490278830/topology new file mode 100644 index 000000000000..d7360f56d786 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/format-time_-_time_to_string/7.0.0_1624490278830/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TIME_STREAM) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/parse-time_-_string_to_time/7.0.0_1624490348282/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/parse-time_-_string_to_time/7.0.0_1624490348282/plan.json new file mode 100644 index 000000000000..5ce5612efb19 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/parse-time_-_string_to_time/7.0.0_1624490348282/plan.json @@ -0,0 +1,153 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K STRING KEY, ID BIGINT, NAME STRING, TIME STRING, FORMAT STRING) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`K` STRING KEY, `ID` BIGINT, `NAME` STRING, `TIME` STRING, `FORMAT` STRING", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TS AS SELECT\n TEST.K K,\n TEST.ID ID,\n PARSE_TIME(TEST.TIME, TEST.FORMAT) TS\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TS", + "schema" : "`K` STRING KEY, `ID` BIGINT, `TS` TIME", + "topicName" : "TS", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TS", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TS" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`K` STRING KEY, `ID` BIGINT, `NAME` STRING, `TIME` STRING, `FORMAT` STRING" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "ID AS ID", "PARSE_TIME(TIME, FORMAT) AS TS" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "TS" + }, + "queryId" : "CSAS_TS_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/parse-time_-_string_to_time/7.0.0_1624490348282/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/parse-time_-_string_to_time/7.0.0_1624490348282/spec.json new file mode 100644 index 000000000000..a66f2c839c45 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/parse-time_-_string_to_time/7.0.0_1624490348282/spec.json @@ -0,0 +1,108 @@ +{ + "version" : "7.0.0", + "timestamp" : 1624490348282, + "path" : "query-validation-tests/parse-time.json", + "schemas" : { + "CSAS_TS_0.TS" : { + "schema" : "`K` STRING KEY, `ID` BIGINT, `TS` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "CSAS_TS_0.KsqlTopic.Source" : { + "schema" : "`K` STRING KEY, `ID` BIGINT, `NAME` STRING, `TIME` STRING, `FORMAT` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + } + }, + "testCase" : { + "name" : "string to time", + "inputs" : [ { + "topic" : "test_topic", + "key" : "0", + "value" : "0,zero,00:01:05Lit,HH:mm:ss'Lit'" + }, { + "topic" : "test_topic", + "key" : "1", + "value" : "1,zero,11/05/19,HH/mm/ss" + }, { + "topic" : "test_topic", + "key" : "2", + "value" : "2,zero,01:00:00 PM,hh:mm:ss a" + } ], + "outputs" : [ { + "topic" : "TS", + "key" : "0", + "value" : "0,65000" + }, { + "topic" : "TS", + "key" : "1", + "value" : "1,39919000" + }, { + "topic" : "TS", + "key" : "2", + "value" : "2,46800000" + } ], + "topics" : [ { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "TS", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (K STRING KEY, ID bigint, NAME varchar, time varchar, format varchar) WITH (kafka_topic='test_topic', value_format='DELIMITED');", "CREATE STREAM TS AS select K, id, parse_time(time, format) as ts from test;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `ID` BIGINT, `NAME` STRING, `TIME` STRING, `FORMAT` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TS", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `ID` BIGINT, `TS` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "TS", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/parse-time_-_string_to_time/7.0.0_1624490348282/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/parse-time_-_string_to_time/7.0.0_1624490348282/topology new file mode 100644 index 000000000000..aabd9cade152 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/parse-time_-_string_to_time/7.0.0_1624490348282/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TS) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/format-time.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/format-time.json new file mode 100644 index 000000000000..478deda43516 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/format-time.json @@ -0,0 +1,26 @@ +{ + "tests": [ + { + "name": "time to string", + "statements": [ + "CREATE STREAM TEST (ID bigint KEY, START_TIME time, DATE_FORMAT varchar) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM TIME_STREAM AS select ID, format_time(START_TIME, DATE_FORMAT) as CUSTOM_FORMATTED_START_TIME from test;" + ], + "inputs": [ + {"topic": "test_topic", "key": 1, "value": {"START_TIME": 65000, "DATE_FORMAT": "HH_mm_ss_SSS"}}, + {"topic": "test_topic", "key": 2, "value": {"START_TIME": 1234567, "DATE_FORMAT": "HHmmss'AAAAA'"}}, + {"topic": "test_topic", "key": 3, "value": {"START_TIME": 0, "DATE_FORMAT": "HH:mm:ss"}}, + {"topic": "test_topic", "key": 5, "value": {"DATE_FORMAT": "HH:mm:ss"}} + + ], + "outputs": [ + {"topic": "TIME_STREAM", "key": 1, "value": {"CUSTOM_FORMATTED_START_TIME": "00_01_05_000"}}, + {"topic": "TIME_STREAM", "key": 2, "value": {"CUSTOM_FORMATTED_START_TIME": "002034AAAAA"}}, + {"topic": "TIME_STREAM", "key": 3, "value": {"CUSTOM_FORMATTED_START_TIME": "00:00:00"}}, + {"topic": "TIME_STREAM", "key": 5, "value": {"CUSTOM_FORMATTED_START_TIME": null}} + ] + } + ] +} + + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/parse-time.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/parse-time.json new file mode 100644 index 000000000000..74f0ce23c60a --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/parse-time.json @@ -0,0 +1,21 @@ +{ + "tests": [ + { + "name": "string to time", + "statements": [ + "CREATE STREAM TEST (K STRING KEY, ID bigint, NAME varchar, time varchar, format varchar) WITH (kafka_topic='test_topic', value_format='DELIMITED');", + "CREATE STREAM TS AS select K, id, parse_time(time, format) as ts from test;" + ], + "inputs": [ + {"topic": "test_topic", "key": "0", "value": "0,zero,00:01:05Lit,HH:mm:ss'Lit'"}, + {"topic": "test_topic", "key": "1", "value": "1,zero,11/05/19,HH/mm/ss"}, + {"topic": "test_topic", "key": "2", "value": "2,zero,01:00:00 PM,hh:mm:ss a"} + ], + "outputs": [ + {"topic": "TS", "key": "0", "value": "0,65000"}, + {"topic": "TS", "key": "1", "value": "1,39919000"}, + {"topic": "TS", "key": "2", "value": "2,46800000"} + ] + } + ] +} \ No newline at end of file diff --git a/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/JavaToSqlConverter.java b/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/JavaToSqlConverter.java index 87f70a3adc03..e74f69eedddf 100644 --- a/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/JavaToSqlConverter.java +++ b/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/JavaToSqlConverter.java @@ -21,6 +21,7 @@ import io.confluent.ksql.schema.utils.SchemaException; import io.confluent.ksql.types.KsqlStruct; import java.math.BigDecimal; +import java.sql.Time; import java.sql.Timestamp; import java.util.List; import java.util.Map; @@ -42,6 +43,7 @@ class JavaToSqlConverter implements JavaToSqlTypeConverter { .put(List.class, SqlBaseType.ARRAY) .put(Map.class, SqlBaseType.MAP) .put(KsqlStruct.class, SqlBaseType.STRUCT) + .put(Time.class, SqlBaseType.TIME) .put(Timestamp.class, SqlBaseType.TIMESTAMP) .build(); From db63aba77e93f844a1dc7e880b7ff6b2f70d74b1 Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Wed, 23 Jun 2021 16:23:39 -0700 Subject: [PATCH 2/4] uncomment historic plan generator --- .../confluent/ksql/test/planned/PlannedTestGeneratorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/planned/PlannedTestGeneratorTest.java b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/planned/PlannedTestGeneratorTest.java index ca30a2c42efe..71712a7c7c5b 100644 --- a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/planned/PlannedTestGeneratorTest.java +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/planned/PlannedTestGeneratorTest.java @@ -35,7 +35,7 @@ public class PlannedTestGeneratorTest { * with your change. Otherwise, {@link PlannedTestsUpToDateTest} fill fail if there are missing * or changed query plans. */ - // @Ignore("Comment me out to regenerate the historic plans") + @Ignore("Comment me out to regenerate the historic plans") @Test public void manuallyGeneratePlans() { PlannedTestGenerator.generatePlans(QueryTranslationTest.findTestCases() From 1695d73e3142b7e823cc951e904533e2f0ed3c0d Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Mon, 28 Jun 2021 21:24:35 -0700 Subject: [PATCH 3/4] add tests for unsupported fields --- .../ksqldb-reference/scalar-functions.md | 4 ++-- .../ksql/function/udf/datetime/FormatTime.java | 3 ++- .../ksql/function/udf/datetime/ParseTime.java | 18 ++++++++++++++++-- .../function/udf/datetime/FormatTimeTest.java | 11 +++++++++++ .../function/udf/datetime/ParseTimeTest.java | 11 +++++++++++ 5 files changed, 42 insertions(+), 5 deletions(-) diff --git a/docs/developer-guide/ksqldb-reference/scalar-functions.md b/docs/developer-guide/ksqldb-reference/scalar-functions.md index 4621031e1fc9..416820ad9e23 100644 --- a/docs/developer-guide/ksqldb-reference/scalar-functions.md +++ b/docs/developer-guide/ksqldb-reference/scalar-functions.md @@ -1204,10 +1204,10 @@ TIMEZONE is an optional parameter and it is a `java.util.TimeZone` ID format, fo Since: 0.20 ```sql -FORMAT_TIME(timestamp, 'HH:mm:ss.SSS') +FORMAT_TIME(time, 'HH:mm:ss.SSS') ``` -Converts a TIME value into the string representation of the timestamp in the given format. +Converts a TIME value into the string representation of the time in the given format. Single quotes in the time format can be escaped with two successive single quotes, `''`, for example: `'''T''HH:mm:ssX'`. diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/FormatTime.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/FormatTime.java index 26a19792d417..f7f3613ef7a3 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/FormatTime.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/FormatTime.java @@ -28,6 +28,7 @@ import java.time.LocalTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; @UdfDescription( name = "format_time", @@ -58,7 +59,7 @@ public String formatTime( } try { final DateTimeFormatter formatter = formatters.get(formatPattern); - return LocalTime.ofNanoOfDay(time.getTime() * 1000000).format(formatter); + return LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(time.getTime())).format(formatter); } catch (ExecutionException | RuntimeException e) { throw new KsqlFunctionException("Failed to format time " + LocalTime.ofNanoOfDay(time.getTime() * 1000000) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/ParseTime.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/ParseTime.java index a626243a38e4..731f7b1bf90f 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/ParseTime.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/ParseTime.java @@ -27,7 +27,12 @@ import java.sql.Time; import java.time.LocalTime; import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoField; +import java.time.temporal.TemporalAccessor; +import java.util.Arrays; +import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; @UdfDescription( name = "parse_time", @@ -52,8 +57,17 @@ public Time parseTime( description = "The format pattern should be in the format expected by" + " java.time.format.DateTimeFormatter.") final String formatPattern) { try { - final DateTimeFormatter formatter = formatters.get(formatPattern); - return new Time(LocalTime.parse(formattedTime, formatter).toNanoOfDay() / 1000000); + final TemporalAccessor ta = formatters.get(formatPattern).parse(formattedTime); + final Optional dateFieldCount = Arrays.stream(ChronoField.values()) + .filter(field -> field.isDateBased()) + .filter(field -> ta.isSupported(field)) + .findFirst(); + + if (dateFieldCount.isPresent()) { + throw new KsqlFunctionException("Unsupported field: " + dateFieldCount.get().toString()); + } + + return new Time(TimeUnit.NANOSECONDS.toMillis(LocalTime.from(ta).toNanoOfDay())); } catch (ExecutionException | RuntimeException e) { throw new KsqlFunctionException("Failed to parse time '" + formattedTime + "' with formatter '" + formatPattern diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/FormatTimeTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/FormatTimeTest.java index dd53f306cdf8..106a22dddd81 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/FormatTimeTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/FormatTimeTest.java @@ -46,6 +46,17 @@ public void shouldConvertTimeToString() { assertThat(result, is("000105")); } + @Test + public void shouldRejectUnsupportedFields() { + // When: + final Exception e = assertThrows( + KsqlFunctionException.class, + () -> udf.formatTime(new Time(65000), "yyyy HHmmss")); + + // Then: + assertThat(e.getMessage(), is("Failed to format time 00:01:05 with formatter 'yyyy HHmmss': Unsupported field: YearOfEra")); + } + @Test public void shouldSupportEmbeddedChars() { // When: diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/ParseTimeTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/ParseTimeTest.java index d9ccb7be7059..6ce506651c7d 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/ParseTimeTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/ParseTimeTest.java @@ -45,6 +45,17 @@ public void shouldConvertStringToDate() { assertThat(result.getTime(), is(65000L)); } + @Test + public void shouldThrowOnUnsupportedFields() { + // When: + final Exception e = assertThrows( + KsqlFunctionException.class, + () -> udf.parseTime("2020 000105", "yyyy HHmmss")); + + // Then: + assertThat(e.getMessage(), is("Failed to parse time '2020 000105' with formatter 'yyyy HHmmss': Unsupported field: Year")); + } + @Test public void shouldSupportEmbeddedChars() { // When: From 1e0a25af3cdf351b1c7a319b35e2f35f36daa4b3 Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Mon, 28 Jun 2021 21:39:06 -0700 Subject: [PATCH 4/4] make error message more general --- .../io/confluent/ksql/function/udf/datetime/ParseTime.java | 6 +++--- .../confluent/ksql/function/udf/datetime/ParseTimeTest.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/ParseTime.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/ParseTime.java index 731f7b1bf90f..c2e2313d3680 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/ParseTime.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/ParseTime.java @@ -58,13 +58,13 @@ public Time parseTime( + " java.time.format.DateTimeFormatter.") final String formatPattern) { try { final TemporalAccessor ta = formatters.get(formatPattern).parse(formattedTime); - final Optional dateFieldCount = Arrays.stream(ChronoField.values()) + final Optional dateField = Arrays.stream(ChronoField.values()) .filter(field -> field.isDateBased()) .filter(field -> ta.isSupported(field)) .findFirst(); - if (dateFieldCount.isPresent()) { - throw new KsqlFunctionException("Unsupported field: " + dateFieldCount.get().toString()); + if (dateField.isPresent()) { + throw new KsqlFunctionException("Time format contains date field."); } return new Time(TimeUnit.NANOSECONDS.toMillis(LocalTime.from(ta).toNanoOfDay())); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/ParseTimeTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/ParseTimeTest.java index 6ce506651c7d..7b1d7040e48b 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/ParseTimeTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/ParseTimeTest.java @@ -53,7 +53,7 @@ public void shouldThrowOnUnsupportedFields() { () -> udf.parseTime("2020 000105", "yyyy HHmmss")); // Then: - assertThat(e.getMessage(), is("Failed to parse time '2020 000105' with formatter 'yyyy HHmmss': Unsupported field: Year")); + assertThat(e.getMessage(), is("Failed to parse time '2020 000105' with formatter 'yyyy HHmmss': Time format contains date field.")); } @Test