Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: timestamp support - casting, comparisons and serde #6806

Merged
merged 5 commits into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion design-proposals/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,4 @@ Next KLIP number: **44**
| KLIP-40: Prepared Statements for Java Client | Proposal | | | |
| [KLIP-41: ksqlDB .NET Client](klip-41-ksqldb-.net-client.md) | Proposal | | | [Discussion](https://github.com/confluentinc/ksql/pull/6613)|
| KLIP-42: Schema Migrations Tool | Proposal | | | |
| KLIP-43: TIMESTAMP data type support | Proposal | | | [Discussion](https://github.com/confluentinc/ksql/pull/6649) |
| [KLIP-43: TIMESTAMP data type support ](klip-43-timestamp-data-type-support.md) | Approved | | | [Discussion](https://github.com/confluentinc/ksql/pull/6649) |
4 changes: 2 additions & 2 deletions design-proposals/klip-43-timestamp-data-type-support.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# KLIP-43: TIMESTAMP Data Type Support

**Author**: @jzaralim |
**Release Target**: 0.15, 0.16 |
**Status**: In Discussion |
**Release Target**: 0.16 |
**Status**: Approved |
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
**Discussion**: https://github.com/confluentinc/ksql/pull/6649

**tl;dr:** _Add support for TIMESTAMP column types in ksqlDB. This will allow users to easily migrate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,9 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.confluent.ksql.function.types.ArrayType;
import io.confluent.ksql.function.types.BooleanType;
import io.confluent.ksql.function.types.DecimalType;
import io.confluent.ksql.function.types.DoubleType;
import io.confluent.ksql.function.types.GenericType;
import io.confluent.ksql.function.types.IntegerType;
import io.confluent.ksql.function.types.LongType;
import io.confluent.ksql.function.types.MapType;
import io.confluent.ksql.function.types.ParamType;
import io.confluent.ksql.function.types.StringType;
import io.confluent.ksql.function.types.StructType;
import io.confluent.ksql.schema.ksql.SchemaConverters;
import io.confluent.ksql.schema.ksql.types.SqlArray;
Expand Down Expand Up @@ -220,18 +214,9 @@ private static boolean resolveGenerics(
}

private static boolean matches(final ParamType schema, final SqlType instance) {
switch (instance.baseType()) {
case BOOLEAN: return schema instanceof BooleanType;
case INTEGER: return schema instanceof IntegerType;
case BIGINT: return schema instanceof LongType;
case DECIMAL: return schema instanceof DecimalType;
case DOUBLE: return schema instanceof DoubleType;
case STRING: return schema instanceof StringType;
case ARRAY: return schema instanceof ArrayType;
case MAP: return schema instanceof MapType;
case STRUCT: return schema instanceof StructType;
default: return false;
}
final ParamType instanceParamType = SchemaConverters
.sqlToFunctionConverter().toFunctionType(instance);
return schema.getClass() == instanceParamType.getClass();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ private ParamTypes() {
public static final StringType STRING = StringType.INSTANCE;
public static final LongType LONG = LongType.INSTANCE;
public static final ParamType DECIMAL = DecimalType.INSTANCE;
public static final TimestampType TIMESTAMP = TimestampType.INSTANCE;

public static boolean areCompatible(final SqlType actual, final ParamType declared) {
return areCompatible(actual, declared, false);
Expand Down Expand Up @@ -102,6 +103,7 @@ private static boolean isPrimitiveMatch(
|| base == SqlBaseType.BOOLEAN && declared instanceof BooleanType
|| base == SqlBaseType.DOUBLE && declared instanceof DoubleType
|| base == SqlBaseType.DECIMAL && declared instanceof DecimalType
|| base == SqlBaseType.TIMESTAMP && declared instanceof TimestampType
|| allowCast && base.canImplicitlyCast(functionToSqlBaseConverter().toBaseType(declared));
// CHECKSTYLE_RULES.ON: BooleanExpressionComplexity
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.types;

public final class TimestampType extends ObjectType {
public static final TimestampType INSTANCE = new TimestampType();

private TimestampType() {
}

@Override
public int hashCode() {
return 0;
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public boolean equals(final Object obj) {
return obj instanceof TimestampType;
}

@Override
public String toString() {
return "TIMESTAMP";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Timestamp;

/**
* Util class for converting between KSQL's different type systems.
Expand Down Expand Up @@ -215,7 +216,8 @@ private static final class ConnectToSqlConverter implements ConnectToSqlTypeConv
private static final Map<Schema.Type, Function<Schema, SqlType>> CONNECT_TO_SQL = ImmutableMap
.<Schema.Type, Function<Schema, SqlType>>builder()
.put(Schema.Type.INT32, s -> SqlTypes.INTEGER)
.put(Schema.Type.INT64, s -> SqlTypes.BIGINT)
.put(Schema.Type.INT64, s ->
s.name() == Timestamp.LOGICAL_NAME ? SqlTypes.TIMESTAMP : SqlTypes.BIGINT)
.put(Schema.Type.FLOAT64, s -> SqlTypes.DOUBLE)
.put(Schema.Type.BOOLEAN, s -> SqlTypes.BOOLEAN)
.put(Schema.Type.STRING, s -> SqlTypes.STRING)
Expand Down Expand Up @@ -275,6 +277,7 @@ private static final class ConnectFromSqlConverter implements SqlToConnectTypeCo
.put(SqlBaseType.ARRAY, t -> ConnectFromSqlConverter.fromSqlArray((SqlArray) t))
.put(SqlBaseType.MAP, t -> ConnectFromSqlConverter.fromSqlMap((SqlMap) t))
.put(SqlBaseType.STRUCT, t -> ConnectFromSqlConverter.fromSqlStruct((SqlStruct) t))
.put(SqlBaseType.TIMESTAMP, t -> Timestamp.builder().optional())
.build();

@Override
Expand Down Expand Up @@ -339,6 +342,7 @@ private static class JavaToSqlConverter implements JavaToSqlTypeConverter {
.put(List.class, SqlBaseType.ARRAY)
.put(Map.class, SqlBaseType.MAP)
.put(Struct.class, SqlBaseType.STRUCT)
.put(java.sql.Timestamp.class, SqlBaseType.TIMESTAMP)
.build();

@Override
Expand Down Expand Up @@ -376,6 +380,7 @@ private static class FunctionToSql implements FunctionToSqlConverter {
.put(ParamTypes.INTEGER, SqlTypes.INTEGER)
.put(ParamTypes.LONG, SqlTypes.BIGINT)
.put(ParamTypes.DOUBLE, SqlTypes.DOUBLE)
.put(ParamTypes.TIMESTAMP, SqlTypes.TIMESTAMP)
.build();

@Override
Expand Down Expand Up @@ -417,6 +422,7 @@ private static class FunctionToSqlBase implements FunctionToSqlBaseConverter {
.put(ParamTypes.LONG, SqlBaseType.BIGINT)
.put(ParamTypes.DOUBLE, SqlBaseType.DOUBLE)
.put(ParamTypes.DECIMAL, SqlBaseType.DECIMAL)
.put(ParamTypes.TIMESTAMP, SqlBaseType.TIMESTAMP)
.build();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public final class SqlTypeWalker {
.put(SqlBaseType.DOUBLE, (v, t) -> v.visitDouble((SqlPrimitiveType) t))
.put(SqlBaseType.STRING, (v, t) -> v.visitString((SqlPrimitiveType) t))
.put(SqlBaseType.DECIMAL, (v, t) -> v.visitDecimal((SqlDecimal) t))
.put(SqlBaseType.TIMESTAMP, (v, t) -> v.visitTimestamp((SqlPrimitiveType) t))
.put(SqlBaseType.ARRAY, SqlTypeWalker::visitArray)
.put(SqlBaseType.MAP, SqlTypeWalker::visitMap)
.put(SqlBaseType.STRUCT, SqlTypeWalker::visitStruct)
Expand Down Expand Up @@ -85,6 +86,10 @@ default S visitDecimal(final SqlDecimal type) {
return visitType(type);
}

default S visitTimestamp(final SqlPrimitiveType type) {
return visitPrimitive(type);
}

default S visitArray(final SqlArray type, final S element) {
return visitType(type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.sql.Timestamp;
import java.time.ZoneId;

/**
Expand All @@ -31,12 +32,16 @@ public class PartialStringToTimestampParser {
private static final String HELP_MESSAGE = System.lineSeparator()
+ "Required format is: \"" + KsqlConstants.DATE_TIME_PATTERN + "\", "
+ "with an optional numeric 4-digit timezone, for example: "
+ "'2020-05-26T23.59.58.000' or with tz: '2020-05-26T23.59.58.000+0200'. "
+ "'2020-05-26T23:59:58.000' or with tz: '2020-05-26T23:59:58.000+0200'. "
+ "Partials are also supported, for example \"2020-05-26\"";

private static final StringToTimestampParser PARSER =
new StringToTimestampParser(KsqlConstants.DATE_TIME_PATTERN);

public Timestamp parseToTimestamp(final String text) {
return new Timestamp(parse(text));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parse is gonna parse strings with timezones if the string contains +0200 for instance. Isn't this going to cause an issue?

}

@SuppressWarnings("MethodMayBeStatic") // Non-static to support DI.
public long parse(final String text) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static io.confluent.ksql.schema.ksql.types.SqlTypes.DOUBLE;
import static io.confluent.ksql.schema.ksql.types.SqlTypes.INTEGER;
import static io.confluent.ksql.schema.ksql.types.SqlTypes.STRING;
import static io.confluent.ksql.schema.ksql.types.SqlTypes.TIMESTAMP;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class OperatorTest {
.put(SqlBaseType.DECIMAL, SqlTypes.decimal(2, 1))
.put(SqlBaseType.DOUBLE, DOUBLE)
.put(SqlBaseType.STRING, STRING)
.put(SqlBaseType.TIMESTAMP, TIMESTAMP)
.put(SqlBaseType.ARRAY, SqlTypes.array(BIGINT))
.put(SqlBaseType.MAP, SqlTypes.map(SqlTypes.STRING, INTEGER))
.put(SqlBaseType.STRUCT, SqlTypes.struct().field("f", INTEGER).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlException;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -62,13 +63,16 @@ public class SchemaConvertersTest {
private static final Schema CONNECT_BIGINT_SCHEMA = SchemaBuilder.int64().optional().build();
private static final Schema CONNECT_DOUBLE_SCHEMA = SchemaBuilder.float64().optional().build();
private static final Schema CONNECT_STRING_SCHEMA = SchemaBuilder.string().optional().build();
private static final Schema CONNECT_TIMESTAMP_SCHEMA =
SchemaBuilder.int64().name("org.apache.kafka.connect.data.Timestamp").version(1).optional().build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is version 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a part of the schema from the Timestamp class. This ended up being replaced by the builder from the Timestamp class though.


private static final BiMap<SqlType, Schema> SQL_TO_LOGICAL = ImmutableBiMap.<SqlType, Schema>builder()
.put(SqlTypes.BOOLEAN, CONNECT_BOOLEAN_SCHEMA)
.put(SqlTypes.INTEGER, CONNECT_INTEGER_SCHEMA)
.put(SqlTypes.BIGINT, CONNECT_BIGINT_SCHEMA)
.put(SqlTypes.DOUBLE, CONNECT_DOUBLE_SCHEMA)
.put(SqlTypes.STRING, CONNECT_STRING_SCHEMA)
.put(SqlTypes.TIMESTAMP, CONNECT_TIMESTAMP_SCHEMA)
.put(SqlArray.of(SqlTypes.INTEGER), SchemaBuilder
.array(Schema.OPTIONAL_INT32_SCHEMA)
.optional()
Expand Down Expand Up @@ -100,6 +104,7 @@ public class SchemaConvertersTest {
.put(SqlBaseType.ARRAY, List.class)
.put(SqlBaseType.MAP, Map.class)
.put(SqlBaseType.STRUCT, Struct.class)
.put(SqlBaseType.TIMESTAMP, Timestamp.class)
.build();

private static final BiMap<SqlType, ParamType> SQL_TO_FUNCTION = ImmutableBiMap
Expand All @@ -109,6 +114,7 @@ public class SchemaConvertersTest {
.put(SqlTypes.BIGINT, ParamTypes.LONG)
.put(SqlTypes.DOUBLE, ParamTypes.DOUBLE)
.put(SqlTypes.STRING, ParamTypes.STRING)
.put(SqlTypes.TIMESTAMP, ParamTypes.TIMESTAMP)
.put(SqlArray.of(SqlTypes.INTEGER), ArrayType.of(ParamTypes.INTEGER))
.put(SqlDecimal.of(2, 1), ParamTypes.DECIMAL)
.put(
Expand Down Expand Up @@ -280,6 +286,12 @@ public void shouldConvertJavaStringToSqlString() {
is(SqlBaseType.STRING));
}

@Test
public void shouldConvertJavaStringToSqlTimestamp() {
assertThat(javaToSqlConverter().toSqlType(Timestamp.class),
is(SqlBaseType.TIMESTAMP));
}

@Test
public void shouldThrowOnUnknownJavaType() {
// When:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,20 @@ public void shouldVisitDecimal() {
assertThat(result, is("Expected"));
}

@Test
public void shouldVisitTimestamp() {
// Given:
final SqlPrimitiveType type = SqlTypes.TIMESTAMP;
when(visitor.visitTimestamp(any())).thenReturn("Expected");

// When:
final String result = SqlTypeWalker.visit(type, visitor);

// Then:
verify(visitor).visitTimestamp(same(type));
assertThat(result, is("Expected"));
}

@Test
public void shouldVisitArray() {
// Given:
Expand Down Expand Up @@ -328,7 +342,8 @@ public static Stream<SqlType> primitiveTypes() {
SqlTypes.INTEGER,
SqlTypes.BIGINT,
SqlTypes.DOUBLE,
SqlTypes.STRING
SqlTypes.STRING,
SqlTypes.TIMESTAMP
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ public void shouldIncludeRequiredFormatInErrorMessage() {
+ "with an optional numeric 4-digit timezone"));
}

@Test
public void shouldParseToTimestamp() {
assertThat(parser.parseToTimestamp("2017-11-13T23:59:58.999-0100").getTime(), is(1510621198999L));
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test to parse a time without timezone. Btw, are we supporting this? or what is the behavior we discussed when using timezone values in the a string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conclusion was to include a conversion function and to not store timezone information. Timezones in strings were not specified in the klip, but I think it makes sense to support it because this is basically the same thing as calling CONVERT_TZ("2017-11-13T23:59:58.999","-0100"), and it is something we already support in some filter expressions.

private static long fullParse(final String text) {
return FULL_PARSER.parse(text);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static io.confluent.ksql.schema.ksql.types.SqlBaseType.MAP;
import static io.confluent.ksql.schema.ksql.types.SqlBaseType.STRING;
import static io.confluent.ksql.schema.ksql.types.SqlBaseType.STRUCT;
import static io.confluent.ksql.schema.ksql.types.SqlBaseType.TIMESTAMP;
import static java.util.Objects.requireNonNull;

import com.google.common.collect.ImmutableMap;
Expand All @@ -40,6 +41,7 @@
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlException;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -295,6 +297,7 @@ private static final class Rules {
.put(key(DOUBLE, DOUBLE), Coercer.PASS_THROUGH)
// STRING:
.put(key(STRING, STRING), Coercer.PASS_THROUGH)
.put(key(STRING, TIMESTAMP), parser((v, t) -> SqlTimestamps.parseTimestamp(v)))
// ARRAY:
.put(key(ARRAY, ARRAY), coercer(
DefaultSqlValueCoercer::canCoerceToArray,
Expand All @@ -310,6 +313,8 @@ private static final class Rules {
DefaultSqlValueCoercer::canCoerceToStruct,
DefaultSqlValueCoercer::coerceToStruct
))
// TIMESTAMP:
.put(key(TIMESTAMP, TIMESTAMP), Coercer.PASS_THROUGH)
.build();

private static final ImmutableMap<SupportedCoercion, Coercer> LAX_ADDITIONAL =
Expand All @@ -332,6 +337,9 @@ private static final class Rules {
.put(key(STRING, DECIMAL), parser((v, t) -> DecimalUtil
.ensureFit(new BigDecimal(v), (SqlDecimal) t)))
.put(key(STRING, DOUBLE), parser((v, t) -> SqlDoubles.parseDouble(v)))
// TIMESTAMP:
.put(key(TIMESTAMP, STRING), coercer((c, v, t)
-> Result.of(SqlTimestamps.formatTimestamp((Timestamp) v))))
.build();

private static Coercer parser(final BiFunction<String, SqlType, Object> parserFunction) {
Expand Down
Loading