Skip to content

Commit

Permalink
feat: add PARSE_TIME and FORMAT_TIME functions (#7722)
Browse files Browse the repository at this point in the history
* feat: add PARSE_TIME and FORMAT_TIME functions

* uncomment historic plan generator

* add tests for unsupported fields

* make error message more general
  • Loading branch information
Zara Lim authored Jun 30, 2021
1 parent 35b1cad commit 9a381a8
Show file tree
Hide file tree
Showing 17 changed files with 1,027 additions and 0 deletions.
27 changes: 27 additions & 0 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,33 @@ TIMEZONE is an optional parameter and it is a `java.util.TimeZone` ID format, fo
"America/Los_Angeles", "PDT", "Europe/London". For more information on timestamp formats, see
[DateTimeFormatter](https://cnfl.io/java-dtf).

### `FORMAT_TIME`

Since: 0.20

```sql
FORMAT_TIME(time, 'HH:mm:ss.SSS')
```

Converts a TIME value into the string representation of the time in the given format.
Single quotes in the time format can be escaped with two successive single quotes, `''`, for
example: `'''T''HH:mm:ssX'`.

For more information on time formats, see [DateTimeFormatter](https://cnfl.io/java-dtf).

### `PARSE_TIME`

Since: 0.20

```sql
PARSE_TIME(col1, 'HH:mm:ss.SSS')
```

Converts a string value in the given format into a TIME value. Single quotes in the time
format can be escaped with two successive single quotes, `''`, for example: `'''T''HH:mm:ssX'`.

For more information on time formats, see [DateTimeFormatter](https://cnfl.io/java-dtf).

### `CONVERT_TZ`

```sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ private static boolean isPrimitiveMatch(
|| base == SqlBaseType.BOOLEAN && declared instanceof BooleanType
|| base == SqlBaseType.DOUBLE && declared instanceof DoubleType
|| base == SqlBaseType.DECIMAL && declared instanceof DecimalType
|| base == SqlBaseType.TIME && declared instanceof TimeType
|| base == SqlBaseType.DATE && declared instanceof DateType
|| base == SqlBaseType.TIMESTAMP && declared instanceof TimestampType
|| allowCast && base.canImplicitlyCast(functionToSqlBaseConverter().toBaseType(declared));
// CHECKSTYLE_RULES.ON: BooleanExpressionComplexity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.math.BigDecimal;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
Expand All @@ -56,6 +57,7 @@ class UdafTypes {
.add(Struct.class)
.add(List.class)
.add(Map.class)
.add(Time.class)
.add(Timestamp.class)
.add(TimeUnit.class)
.add(Function.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.datetime;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.sql.Time;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

@UdfDescription(
name = "format_time",
category = FunctionCategory.DATE_TIME,
author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Converts a TIME value into the string representation of the time"
+ " in the given format."
)
public class FormatTime {

private final LoadingCache<String, DateTimeFormatter> formatters =
CacheBuilder.newBuilder()
.maximumSize(1000)
.build(CacheLoader.from(DateTimeFormatter::ofPattern));

@Udf(description = "Converts a TIME value into the"
+ " string representation of the time in the given format."
+ " The format pattern should be in the format expected"
+ " by java.time.format.DateTimeFormatter")
public String formatTime(
@UdfParameter(
description = "TIME value.") final Time time,
@UdfParameter(
description = "The format pattern should be in the format expected by"
+ " java.time.format.DateTimeFormatter.") final String formatPattern) {
if (time == null) {
return null;
}
try {
final DateTimeFormatter formatter = formatters.get(formatPattern);
return LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(time.getTime())).format(formatter);
} catch (ExecutionException | RuntimeException e) {
throw new KsqlFunctionException("Failed to format time "
+ LocalTime.ofNanoOfDay(time.getTime() * 1000000)
+ " with formatter '" + formatPattern
+ "': " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.datetime;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.sql.Time;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoField;
import java.time.temporal.TemporalAccessor;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

@UdfDescription(
name = "parse_time",
category = FunctionCategory.DATE_TIME,
author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Converts a string representation of a time in the given format"
+ " into a TIME value."
)
public class ParseTime {

private final LoadingCache<String, DateTimeFormatter> formatters =
CacheBuilder.newBuilder()
.maximumSize(1000)
.build(CacheLoader.from(DateTimeFormatter::ofPattern));

@Udf(description = "Converts a string representation of a time in the given format"
+ " into the TIME value.")
public Time parseTime(
@UdfParameter(
description = "The string representation of a time.") final String formattedTime,
@UdfParameter(
description = "The format pattern should be in the format expected by"
+ " java.time.format.DateTimeFormatter.") final String formatPattern) {
try {
final TemporalAccessor ta = formatters.get(formatPattern).parse(formattedTime);
final Optional<ChronoField> dateField = Arrays.stream(ChronoField.values())
.filter(field -> field.isDateBased())
.filter(field -> ta.isSupported(field))
.findFirst();

if (dateField.isPresent()) {
throw new KsqlFunctionException("Time format contains date field.");
}

return new Time(TimeUnit.NANOSECONDS.toMillis(LocalTime.from(ta).toNanoOfDay()));
} catch (ExecutionException | RuntimeException e) {
throw new KsqlFunctionException("Failed to parse time '" + formattedTime
+ "' with formatter '" + formatPattern
+ "': " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.datetime;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;

import io.confluent.ksql.function.KsqlFunctionException;
import java.sql.Time;
import java.util.stream.IntStream;
import org.junit.Before;
import org.junit.Test;

public class FormatTimeTest {

private FormatTime udf;

@Before
public void setUp() {
udf = new FormatTime();
}

@Test
public void shouldConvertTimeToString() {
// When:
final String result = udf.formatTime(new Time(65000), "HHmmss");

// Then:
assertThat(result, is("000105"));
}

@Test
public void shouldRejectUnsupportedFields() {
// When:
final Exception e = assertThrows(
KsqlFunctionException.class,
() -> udf.formatTime(new Time(65000), "yyyy HHmmss"));

// Then:
assertThat(e.getMessage(), is("Failed to format time 00:01:05 with formatter 'yyyy HHmmss': Unsupported field: YearOfEra"));
}

@Test
public void shouldSupportEmbeddedChars() {
// When:
final Object result = udf.formatTime(new Time(65000), "HH:mm:ss.SSS'Fred'");

// Then:
assertThat(result, is("00:01:05.000Fred"));
}

@Test
public void shouldThrowIfFormatInvalid() {
// When:
final Exception e = assertThrows(
KsqlFunctionException.class,
() -> udf.formatTime(new Time(65000), "invalid")
);

// Then:
assertThat(e.getMessage(), containsString("Failed to format time 00:01:05 with formatter 'invalid'"));
}

@Test
public void shouldByThreadSafeAndWorkWithManyDifferentFormatters() {
IntStream.range(0, 10_000)
.parallel()
.forEach(idx -> {
try {
final String pattern = "HH:mm:ss'X" + idx + "'";
final String result = udf.formatTime(new Time(65000), pattern);
assertThat(result, is("00:01:05X" + idx));
} catch (final Exception e) {
fail(e.getMessage());
}
});
}

@Test
public void shoudlReturnNull() {
// When:
final Object result = udf.formatTime(null, "HH:mm:ss.SSS");

// Then:
assertNull(result);
}
}
Loading

0 comments on commit 9a381a8

Please sign in to comment.