-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: timestamp support - casting, comparisons and serde #6806
Conversation
ksqldb-common/src/main/java/io/confluent/ksql/function/types/TimestampType.java
Outdated
Show resolved
Hide resolved
@@ -62,13 +63,16 @@ | |||
private static final Schema CONNECT_BIGINT_SCHEMA = SchemaBuilder.int64().optional().build(); | |||
private static final Schema CONNECT_DOUBLE_SCHEMA = SchemaBuilder.float64().optional().build(); | |||
private static final Schema CONNECT_STRING_SCHEMA = SchemaBuilder.string().optional().build(); | |||
private static final Schema CONNECT_TIMESTAMP_SCHEMA = | |||
SchemaBuilder.int64().name("org.apache.kafka.connect.data.Timestamp").version(1).optional().build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is version 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a part of the schema from the Timestamp class. This ended up being replaced by the builder from the Timestamp class though.
ksqldb-engine-common/src/main/java/io/confluent/ksql/schema/ksql/SqlTimestamps.java
Outdated
Show resolved
Hide resolved
+ "Partials are also supported, for example \"2020-05-26\""; | ||
|
||
private static final StringToTimestampParser PARSER = | ||
new StringToTimestampParser(KsqlConstants.DATE_TIME_PATTERN); | ||
|
||
public Timestamp parseToTimestamp(final String text) { | ||
return new Timestamp(parse(text)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parse
is gonna parse strings with timezones if the string contains +0200
for instance. Isn't this going to cause an issue?
ksqldb-engine/src/test/java/io/confluent/ksql/engine/generic/GenericExpressionResolverTest.java
Show resolved
Hide resolved
...b-execution/src/test/java/io/confluent/ksql/execution/codegen/helpers/CastEvaluatorTest.java
Outdated
Show resolved
Hide resolved
final Long longVal = Long.valueOf(spec.toString()); | ||
if (Timestamp.LOGICAL_NAME.equals(schema.name())) { | ||
return Timestamp.toLogical(schema, (Long) spec); | ||
return new java.sql.Timestamp(longVal); | ||
} | ||
return Long.valueOf(spec.toString()); | ||
return longVal; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it need changes in connectToSpec
?
I see this code in that method:
case INT64:
if (Timestamp.LOGICAL_NAME.equals(schema.name())) {
return Timestamp.fromLogical(schema, (Date) data);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
connectToSpec
is used for checking the values in streams in functional tests. These values are verified against values in a json file and since json files cannot store timestamp objects, comparing the millis is the only way to check timestamp values.
ksqldb-serde/src/main/java/io/confluent/ksql/serde/delimited/KsqlDelimitedDeserializer.java
Outdated
Show resolved
Hide resolved
ksqldb-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonSerializerTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jzaralim. The code looks good to me. I'll run some tests on it once is merged and the rest of the changes is implemented.
Description
First pass at adding timestamp type support (#4148). This PR includes casting between strings and timestamps, comparisons and JSON/Avro/Delimited serde.
It looks like a lot of files, but most are tests or changes that involve adding timestamp to a list of types. All the other files can be broken down as follows:
Parsing
Casting
Comparisons
Serde
Not included in this PR:
google.protobuf.Timestamp
is registered in Schema Registry.Testing done
QTT and unit tests
Reviewer checklist