Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add PARSE_DATE and FORMAT_DATE functions #7733

Merged
merged 5 commits into from
Jul 1, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: add PARSE_DATE and FORMAT_DATE functions
Zara Lim committed Jun 26, 2021
commit 142e565a8e2ebe3b5b1dafaec2af4e82b174a388
25 changes: 25 additions & 0 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
@@ -1103,6 +1103,8 @@ ksqlDB Server instances.

Since: -

Deprecated since 0.20.0 (use FORMAT_DATE)

```sql
DATETOSTRING(START_DATE, 'yyyy-MM-dd')
```
@@ -1117,6 +1119,8 @@ The integer represents days since epoch matching the encoding used by

Since: -

Deprecated since 0.20.0 (use PARSE_DATE)

```sql
STRINGTODATE(col1, 'yyyy-MM-dd')
```
@@ -1199,6 +1203,27 @@ TIMEZONE is an optional parameter and it is a `java.util.TimeZone` ID format, fo
"America/Los_Angeles", "PDT", "Europe/London". For more information on timestamp formats, see
[DateTimeFormatter](https://cnfl.io/java-dtf).

### `FORMAT_DATE`

```sql
FORMAT_DATE(timestamp, 'yyyy-MM-dd')
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
```

Converts DATE value into a string representing the date in the given format.
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
Single quotes in the timestamp format can be escaped with two successive single
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
quotes, `''`, for example: `'yyyy-MM-dd''T'''`.

### `PARSE_DATE`

```sql
PARSE_DATE(col1, 'yyyy-MM-dd')
```

Converts a string representation of a date in the
given format into a DATE value. Single quotes in the timestamp
format can be escaped with two successive single
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
quotes, `''`, for example: `'yyyy-MM-dd''T'''`.

### `CONVERT_TZ`

```sql
Original file line number Diff line number Diff line change
@@ -156,6 +156,7 @@ private static boolean isPrimitiveMatch(
|| base == SqlBaseType.BOOLEAN && declared instanceof BooleanType
|| base == SqlBaseType.DOUBLE && declared instanceof DoubleType
|| base == SqlBaseType.DECIMAL && declared instanceof DecimalType
|| base == SqlBaseType.DATE && declared instanceof DateType
|| base == SqlBaseType.TIMESTAMP && declared instanceof TimestampType
|| allowCast && base.canImplicitlyCast(functionToSqlBaseConverter().toBaseType(declared));
// CHECKSTYLE_RULES.ON: BooleanExpressionComplexity
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
@@ -56,6 +57,7 @@ class UdafTypes {
.add(Struct.class)
.add(List.class)
.add(Map.class)
.add(Date.class)
.add(Timestamp.class)
.add(TimeUnit.class)
.add(Function.class)
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.datetime;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.sql.Date;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

@UdfDescription(
name = "format_date",
category = FunctionCategory.DATE_TIME,
author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Converts a DATE value to a string"
+ " using the given format pattern. The format pattern should be"
+ " in the format expected by java.time.format.DateTimeFormatter."
)
public class FormatDate {

private final LoadingCache<String, DateTimeFormatter> formatters =
CacheBuilder.newBuilder()
.maximumSize(1000)
.build(CacheLoader.from(DateTimeFormatter::ofPattern));

@Udf(description = "Converts the number of days since 1970-01-01 00:00:00 UTC/GMT to a date "
+ "string using the given format pattern. The format pattern should be in the format"
+ " expected by java.time.format.DateTimeFormatter")
public String formatDate(
@UdfParameter(
description = "The date to convert") final Date date,
@UdfParameter(
description = "The format pattern should be in the format expected by"
+ " java.time.format.DateTimeFormatter.") final String formatPattern) {
if (date == null) {
return null;
}
try {
final DateTimeFormatter formatter = formatters.get(formatPattern);
return LocalDate.ofEpochDay(TimeUnit.MILLISECONDS.toDays(date.getTime())).format(formatter);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what if formatPattern contains time characters? Would those be part of the final String?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test for this.

} catch (final ExecutionException | RuntimeException e) {
throw new KsqlFunctionException("Failed to format date " + date
+ " with formatter '" + formatPattern
+ "': " + e.getMessage(), e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.datetime;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.sql.Date;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

@UdfDescription(
name = "parse_date",
category = FunctionCategory.DATE_TIME,
author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Converts a string representation of a date in the given format"
+ " into a DATE value. The format pattern should be in the format expected by"
+ " java.time.format.DateTimeFormatter"
)
public class ParseDate {

private final LoadingCache<String, DateTimeFormatter> formatters =
CacheBuilder.newBuilder()
.maximumSize(1000)
.build(CacheLoader.from(DateTimeFormatter::ofPattern));

@Udf(description = "Converts a string representation of a date in the given format"
+ " into a DATE value.")
public Date parseDate(
@UdfParameter(
description = "The string representation of a date.") final String formattedDate,
@UdfParameter(
description = "The format pattern should be in the format expected by"
+ " java.time.format.DateTimeFormatter.") final String formatPattern) {
try {
final DateTimeFormatter formatter = formatters.get(formatPattern);
return new Date(
TimeUnit.DAYS.toMillis(LocalDate.parse(formattedDate, formatter).toEpochDay()));
} catch (final ExecutionException | RuntimeException e) {
throw new KsqlFunctionException("Failed to parse date '" + formattedDate
+ "' with formatter '" + formatPattern
+ "': " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.datetime;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;

import io.confluent.ksql.function.KsqlFunctionException;
import java.sql.Date;
import java.util.stream.IntStream;
import org.junit.Before;
import org.junit.Test;

public class FormatDateTest {

private FormatDate udf;

@Before
public void setUp() {
udf = new FormatDate();
}

@Test
public void shouldConvertDateToString() {
// When:
final String result = udf.formatDate(Date.valueOf("2014-11-09"), "yyyy-MM-dd");

// Then:
assertThat(result, is("2014-11-09"));
}

@Test
public void shouldRoundTripWithStringToDate() {
final String format = "dd/MM/yyyy'Freya'";
final ParseDate parseDate = new ParseDate();
IntStream.range(-10_000, 20_000)
.parallel()
.forEach(idx -> {
final String result = udf.formatDate(new Date(idx * 86400000L), format);
final Date date = parseDate.parseDate(result, format);
assertThat(date.getTime(), is(idx * 86400000L));
});
}

@Test
public void shouldSupportEmbeddedChars() {
// When:
final Object result = udf.formatDate(Date.valueOf("2014-11-09"), "yyyy-dd-MM'Fred'");

// Then:
assertThat(result, is("2014-09-11Fred"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this. Should we support embedded chars in the final date string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was supported in the old DateToString function:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tested Mysql and it supports this too. Perhaps it is allowed in order to write the format you want, i.e. dates yyyy/MM/dd or yyyy-MM-dd or Today is Year = yyyy Month = MM Day = dd

}

@Test
public void shouldThrowIfFormatInvalid() {
// When:
final Exception e = assertThrows(
KsqlFunctionException.class,
() -> udf.formatDate(Date.valueOf("2014-11-09"), "invalid")
);

// Then:
assertThat(e.getMessage(), containsString("Failed to format date 2014-11-09 with formatter 'invalid'"));
}

@Test
public void shouldByThreadSafeAndWorkWithManyDifferentFormatters() {
IntStream.range(0, 10_000)
.parallel()
.forEach(idx -> {
try {
final String pattern = "yyyy-MM-dd'X" + idx + "'";
final String result = udf.formatDate(Date.valueOf("2021-05-18"), pattern);
assertThat(result, is("2021-05-18X" + idx));
} catch (final Exception e) {
fail(e.getMessage());
}
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.datetime;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;

import io.confluent.ksql.function.KsqlFunctionException;
import java.sql.Date;
import java.util.stream.IntStream;
import org.junit.Before;
import org.junit.Test;

public class ParseDateTest {

private ParseDate udf;

@Before
public void setUp() {
udf = new ParseDate();
}

@Test
public void shouldConvertStringToDate() {
// When:
final Date result = udf.parseDate("2021-12-01", "yyyy-MM-dd");

// Then:
assertThat(result.getTime(), is(1638316800000L));
}

@Test
public void shouldSupportEmbeddedChars() {
// When:
final Date result = udf.parseDate("2021-12-01Fred", "yyyy-MM-dd'Fred'");

// Then:
assertThat(result.getTime(), is(1638316800000L));
}

@Test
public void shouldThrowIfFormatInvalid() {
// When:
final Exception e = assertThrows(
KsqlFunctionException.class,
() -> udf.parseDate("2021-12-01", "invalid")
);

// Then:
assertThat(e.getMessage(), containsString("Failed to parse date '2021-12-01' with formatter 'invalid'"));
}

@Test
public void shouldThrowIfParseFails() {
// When:
final Exception e = assertThrows(
KsqlFunctionException.class,
() -> udf.parseDate("invalid", "yyyy-MM-dd")
);

// Then:
assertThat(e.getMessage(), containsString("Failed to parse date 'invalid' with formatter 'yyyy-MM-dd'"));
}

@Test
public void shouldThrowOnEmptyString() {
// When:
final Exception e = assertThrows(
KsqlFunctionException.class,
() -> udf.parseDate("", "yyyy-MM-dd")
);

// Then:
assertThat(e.getMessage(), containsString("Failed to parse date '' with formatter 'yyyy-MM-dd'"));
}

@Test
public void shouldBeThreadSafeAndWorkWithManyDifferentFormatters() {
IntStream.range(0, 10_000)
.parallel()
.forEach(idx -> {
try {
final String sourceDate = "2021-12-01X" + idx;
final String pattern = "yyyy-MM-dd'X" + idx + "'";
final Date result = udf.parseDate(sourceDate, pattern);
assertThat(result.getTime(), is(1638316800000L));
} catch (final Exception e) {
fail(e.getMessage());
}
});
}


}
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
@@ -55,6 +56,7 @@ public final class UdfUtil {
.put(Double.class, ParamTypes.DOUBLE)
.put(double.class, ParamTypes.DOUBLE)
.put(BigDecimal.class, ParamTypes.DECIMAL)
.put(Date.class, ParamTypes.DATE)
.put(Timestamp.class, ParamTypes.TIMESTAMP)
.put(TimeUnit.class, ParamTypes.INTERVALUNIT)
.build();
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (ID BIGINT KEY, START_DATE DATE, DATE_FORMAT STRING) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`ID` BIGINT KEY, `START_DATE` DATE, `DATE_FORMAT` STRING",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM DATE_STREAM AS SELECT\n TEST.ID ID,\n FORMAT_DATE(TEST.START_DATE, TEST.DATE_FORMAT) CUSTOM_FORMATTED_START_DATE\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "DATE_STREAM",
"schema" : "`ID` BIGINT KEY, `CUSTOM_FORMATTED_START_DATE` STRING",
"topicName" : "DATE_STREAM",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "DATE_STREAM",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "DATE_STREAM"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`ID` BIGINT KEY, `START_DATE` DATE, `DATE_FORMAT` STRING"
},
"keyColumnNames" : [ "ID" ],
"selectExpressions" : [ "FORMAT_DATE(START_DATE, DATE_FORMAT) AS CUSTOM_FORMATTED_START_DATE" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "DATE_STREAM"
},
"queryId" : "CSAS_DATE_STREAM_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.persistent.max.bytes.buffering.total" : "-1",
"ksql.queryanonymizer.logs_enabled" : "true",
"ksql.query.error.max.queue.size" : "10",
"ksql.variable.substitution.enable" : "true",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.queryanonymizer.cluster_namespace" : null,
"ksql.query.pull.metrics.enabled" : "true",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.lambdas.enabled" : "true",
"ksql.suppress.enabled" : "false",
"ksql.query.push.scalable.enabled" : "false",
"ksql.query.push.scalable.interpreter.enabled" : "true",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.query.transient.max.bytes.buffering.total" : "-1",
"ksql.schema.registry.url" : "",
"ksql.properties.overrides.denylist" : "",
"ksql.query.pull.max.concurrent.requests" : "2147483647",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.query.pull.interpreter.enabled" : "true",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.nested.error.set.null" : "true",
"ksql.udf.collect.metrics" : "false",
"ksql.query.pull.thread.pool.size" : "100",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
{
"version" : "7.0.0",
"timestamp" : 1624668401000,
"path" : "query-validation-tests/format-date.json",
"schemas" : {
"CSAS_DATE_STREAM_0.KsqlTopic.Source" : {
"schema" : "`ID` BIGINT KEY, `START_DATE` DATE, `DATE_FORMAT` STRING",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"CSAS_DATE_STREAM_0.DATE_STREAM" : {
"schema" : "`ID` BIGINT KEY, `CUSTOM_FORMATTED_START_DATE` STRING",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
},
"testCase" : {
"name" : "date to string",
"inputs" : [ {
"topic" : "test_topic",
"key" : 1,
"value" : {
"START_DATE" : 17662,
"DATE_FORMAT" : "yyyy-MM-dd"
}
}, {
"topic" : "test_topic",
"key" : 2,
"value" : {
"START_DATE" : 18027,
"DATE_FORMAT" : "dd/MM/yyyy"
}
}, {
"topic" : "test_topic",
"key" : 3,
"value" : {
"START_DATE" : 18993,
"DATE_FORMAT" : "dd-MMM-yyyy"
}
}, {
"topic" : "test_topic",
"key" : 4,
"value" : {
"START_DATE" : 0,
"DATE_FORMAT" : "dd-MM-yyyy"
}
}, {
"topic" : "test_topic",
"key" : 5,
"value" : {
"START_DATE" : -1,
"DATE_FORMAT" : "dd-MM-yyyy'Sophia'"
}
} ],
"outputs" : [ {
"topic" : "DATE_STREAM",
"key" : 1,
"value" : {
"CUSTOM_FORMATTED_START_DATE" : "2018-05-11"
}
}, {
"topic" : "DATE_STREAM",
"key" : 2,
"value" : {
"CUSTOM_FORMATTED_START_DATE" : "11/05/2019"
}
}, {
"topic" : "DATE_STREAM",
"key" : 3,
"value" : {
"CUSTOM_FORMATTED_START_DATE" : "01-Jan-2022"
}
}, {
"topic" : "DATE_STREAM",
"key" : 4,
"value" : {
"CUSTOM_FORMATTED_START_DATE" : "01-01-1970"
}
}, {
"topic" : "DATE_STREAM",
"key" : 5,
"value" : {
"CUSTOM_FORMATTED_START_DATE" : "31-12-1969Sophia"
}
} ],
"topics" : [ {
"name" : "DATE_STREAM",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "test_topic",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM TEST (ID bigint KEY, START_DATE DATE, DATE_FORMAT varchar) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM DATE_STREAM AS select ID, format_date(START_DATE, DATE_FORMAT) as CUSTOM_FORMATTED_START_DATE from test;" ],
"post" : {
"sources" : [ {
"name" : "DATE_STREAM",
"type" : "STREAM",
"schema" : "`ID` BIGINT KEY, `CUSTOM_FORMATTED_START_DATE` STRING",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : "JSON",
"keyFeatures" : [ ],
"valueFeatures" : [ ]
}, {
"name" : "TEST",
"type" : "STREAM",
"schema" : "`ID` BIGINT KEY, `START_DATE` DATE, `DATE_FORMAT` STRING",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : "JSON",
"keyFeatures" : [ ],
"valueFeatures" : [ ]
} ],
"topics" : {
"topics" : [ {
"name" : "test_topic",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
}, {
"name" : "DATE_STREAM",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
} ]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: DATE_STREAM)
<-- Project

Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (K STRING KEY, ID BIGINT, NAME STRING, DATE STRING, FORMAT STRING) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`K` STRING KEY, `ID` BIGINT, `NAME` STRING, `DATE` STRING, `FORMAT` STRING",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TS AS SELECT\n TEST.K K,\n TEST.ID ID,\n PARSE_DATE(TEST.DATE, TEST.FORMAT) TS\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TS",
"schema" : "`K` STRING KEY, `ID` BIGINT, `TS` DATE",
"topicName" : "TS",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "TS",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "TS"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"sourceSchema" : "`K` STRING KEY, `ID` BIGINT, `NAME` STRING, `DATE` STRING, `FORMAT` STRING"
},
"keyColumnNames" : [ "K" ],
"selectExpressions" : [ "ID AS ID", "PARSE_DATE(DATE, FORMAT) AS TS" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"topicName" : "TS"
},
"queryId" : "CSAS_TS_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.persistent.max.bytes.buffering.total" : "-1",
"ksql.queryanonymizer.logs_enabled" : "true",
"ksql.query.error.max.queue.size" : "10",
"ksql.variable.substitution.enable" : "true",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.queryanonymizer.cluster_namespace" : null,
"ksql.query.pull.metrics.enabled" : "true",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.lambdas.enabled" : "true",
"ksql.suppress.enabled" : "false",
"ksql.query.push.scalable.enabled" : "false",
"ksql.query.push.scalable.interpreter.enabled" : "true",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.query.transient.max.bytes.buffering.total" : "-1",
"ksql.schema.registry.url" : "",
"ksql.properties.overrides.denylist" : "",
"ksql.query.pull.max.concurrent.requests" : "2147483647",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.query.pull.interpreter.enabled" : "true",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.nested.error.set.null" : "true",
"ksql.udf.collect.metrics" : "false",
"ksql.query.pull.thread.pool.size" : "100",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
{
"version" : "7.0.0",
"timestamp" : 1624668480830,
"path" : "query-validation-tests/parse-date.json",
"schemas" : {
"CSAS_TS_0.TS" : {
"schema" : "`K` STRING KEY, `ID` BIGINT, `TS` DATE",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"CSAS_TS_0.KsqlTopic.Source" : {
"schema" : "`K` STRING KEY, `ID` BIGINT, `NAME` STRING, `DATE` STRING, `FORMAT` STRING",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
}
},
"testCase" : {
"name" : "string to date",
"inputs" : [ {
"topic" : "test_topic",
"key" : "0",
"value" : "0,zero,2018-05-11Lit,yyyy-MM-dd'Lit'"
}, {
"topic" : "test_topic",
"key" : "1",
"value" : "1,zero,11/05/2019,dd/MM/yyyy"
}, {
"topic" : "test_topic",
"key" : "2",
"value" : "2,zero,01-Jan-2022,dd-MMM-yyyy"
}, {
"topic" : "test_topic",
"key" : "3",
"value" : "3,yyy,01-01-1970,dd-MM-yyyy"
} ],
"outputs" : [ {
"topic" : "TS",
"key" : "0",
"value" : "0,17662"
}, {
"topic" : "TS",
"key" : "1",
"value" : "1,18027"
}, {
"topic" : "TS",
"key" : "2",
"value" : "2,18993"
}, {
"topic" : "TS",
"key" : "3",
"value" : "3,0"
} ],
"topics" : [ {
"name" : "test_topic",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "TS",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM TEST (K STRING KEY, ID bigint, NAME varchar, date varchar, format varchar) WITH (kafka_topic='test_topic', value_format='DELIMITED');", "CREATE STREAM TS AS select K, id, parse_date(date, format) as ts from test;" ],
"post" : {
"sources" : [ {
"name" : "TEST",
"type" : "STREAM",
"schema" : "`K` STRING KEY, `ID` BIGINT, `NAME` STRING, `DATE` STRING, `FORMAT` STRING",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : "DELIMITED",
"keyFeatures" : [ ],
"valueFeatures" : [ ]
}, {
"name" : "TS",
"type" : "STREAM",
"schema" : "`K` STRING KEY, `ID` BIGINT, `TS` DATE",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : "DELIMITED",
"keyFeatures" : [ ],
"valueFeatures" : [ ]
} ],
"topics" : {
"topics" : [ {
"name" : "test_topic",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
},
"partitions" : 4
}, {
"name" : "TS",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
},
"partitions" : 4
} ]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: TS)
<-- Project

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"tests": [
{
"name": "date to string",
"statements": [
"CREATE STREAM TEST (ID bigint KEY, START_DATE DATE, DATE_FORMAT varchar) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM DATE_STREAM AS select ID, format_date(START_DATE, DATE_FORMAT) as CUSTOM_FORMATTED_START_DATE from test;"
],
"inputs": [
{"topic": "test_topic", "key": 1, "value": {"START_DATE": 17662, "DATE_FORMAT": "yyyy-MM-dd"}},
{"topic": "test_topic", "key": 2, "value": {"START_DATE": 18027, "DATE_FORMAT": "dd/MM/yyyy"}},
{"topic": "test_topic", "key": 3, "value": {"START_DATE": 18993, "DATE_FORMAT": "dd-MMM-yyyy"}},
{"topic": "test_topic", "key": 4, "value": {"START_DATE": 0, "DATE_FORMAT": "dd-MM-yyyy"}},
{"topic": "test_topic", "key": 5, "value": {"START_DATE": -1, "DATE_FORMAT": "dd-MM-yyyy'Sophia'"}}

],
"outputs": [
{"topic": "DATE_STREAM", "key": 1, "value": {"CUSTOM_FORMATTED_START_DATE": "2018-05-11"}},
{"topic": "DATE_STREAM", "key": 2, "value": {"CUSTOM_FORMATTED_START_DATE": "11/05/2019"}},
{"topic": "DATE_STREAM", "key": 3, "value": {"CUSTOM_FORMATTED_START_DATE": "01-Jan-2022"}},
{"topic": "DATE_STREAM", "key": 4, "value": {"CUSTOM_FORMATTED_START_DATE": "01-01-1970"}},
{"topic": "DATE_STREAM", "key": 5, "value": {"CUSTOM_FORMATTED_START_DATE": "31-12-1969Sophia"}}
]
}
]
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"tests": [
{
"name": "string to date",
"statements": [
"CREATE STREAM TEST (K STRING KEY, ID bigint, NAME varchar, date varchar, format varchar) WITH (kafka_topic='test_topic', value_format='DELIMITED');",
"CREATE STREAM TS AS select K, id, parse_date(date, format) as ts from test;"
],
"inputs": [
{"topic": "test_topic", "key": "0", "value": "0,zero,2018-05-11Lit,yyyy-MM-dd'Lit'"},
{"topic": "test_topic", "key": "1", "value": "1,zero,11/05/2019,dd/MM/yyyy"},
{"topic": "test_topic", "key": "2", "value": "2,zero,01-Jan-2022,dd-MMM-yyyy"},
{"topic": "test_topic", "key": "3", "value": "3,yyy,01-01-1970,dd-MM-yyyy"}
],
"outputs": [
{"topic": "TS", "key": "0", "value": "0,17662"},
{"topic": "TS", "key": "1", "value": "1,18027"},
{"topic": "TS", "key": "2", "value": "2,18993"},
{"topic": "TS", "key": "3", "value": "3,0"}
]
}
]
}
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
import io.confluent.ksql.schema.utils.SchemaException;
import io.confluent.ksql.types.KsqlStruct;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
@@ -42,6 +43,7 @@ class JavaToSqlConverter implements JavaToSqlTypeConverter {
.put(List.class, SqlBaseType.ARRAY)
.put(Map.class, SqlBaseType.MAP)
.put(KsqlStruct.class, SqlBaseType.STRUCT)
.put(Date.class, SqlBaseType.DATE)
.put(Timestamp.class, SqlBaseType.TIMESTAMP)
.build();