Skip to content

Commit

Permalink
feat: Add support for TIMESTAMP type in the WITH/TIMESTAMP property (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrrr authored Oct 26, 2021
1 parent 3edc18a commit ecb43e2
Show file tree
Hide file tree
Showing 17 changed files with 615 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ The WITH clause for the SELECT result supports the following properties:
| PARTITIONS | The number of partitions in the backing topic. If this property is not set, then the number of partitions of the input stream/table will be used. In join queries, the property values are taken from the left-most stream or table. You can't change the number of partitions on a stream. To change the partition count, you must drop the stream and create it again. |
| REPLICAS | The replication factor for the topic. If this property is not set, then the number of replicas of the input stream or table will be used. In join queries, the property values are taken from the left-most stream or table. |
| TIMESTAMP | Sets a column within this stream's schema to be used as the default source of `ROWTIME` for any downstream queries. Downstream queries that use time-based operations, such as windowing, will process records in this stream based on the timestamp in this column. The column will be used to set the timestamp on any records emitted to Kafka. Timestamps have a millisecond accuracy. If not supplied, the `ROWTIME` of the source stream is used. <br>**Note**: This doesn't affect the processing of the query that populates this stream. For example, given the following statement:<br><pre>CREATE STREAM foo WITH (TIMESTAMP='t2') AS<br>&#0009;SELECT * FROM bar<br>&#0009;WINDOW TUMBLING (size 10 seconds);<br>&#0009;EMIT CHANGES;</pre>The window into which each row of `bar` is placed is determined by bar's `ROWTIME`, not `t2`. |
| TIMESTAMP_FORMAT | Used in conjunction with TIMESTAMP. If not set, ksqlDB timestamp column must be of type `bigint`. When set, the TIMESTAMP column must be of type `varchar` and have a format that can be parsed with the Java `DateTimeFormatter`. If your timestamp format has characters requiring single quotes, you can escape them with two successive single quotes, `''`, for example: `'yyyy-MM-dd''T''HH:mm:ssX'`. For more information on timestamp formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). |
| TIMESTAMP_FORMAT | Used in conjunction with TIMESTAMP. If not set, ksqlDB timestamp column must be of type `bigint` or `timestamp`. When set, the TIMESTAMP column must be of type `varchar` and have a format that can be parsed with the Java `DateTimeFormatter`. If your timestamp format has characters requiring single quotes, you can escape them with two successive single quotes, `''`, for example: `'yyyy-MM-dd''T''HH:mm:ssX'`. For more information on timestamp formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). |
| WRAP_SINGLE_VALUE | Controls how values are serialized where the values schema contains only a single column. This setting controls how the query serializes values with a single-column schema.<br>If set to `true`, ksqlDB serializes the column as a named column within a record.<br>If set to `false`, ksqlDB serializes the column as an anonymous value.<br>If not supplied, the system default, defined by [ksql.persistence.wrap.single.values](/reference/server-configuration#ksqlpersistencewrapsinglevalues), then the format's default is used.<br>**Note:** `null` values have special meaning in ksqlDB. Care should be taken when dealing with single-column schemas where the value can be `null`. For more information, see [Single column (un)wrapping](/reference/serialization#single-field-unwrapping).<br>**Note:** Supplying this property for formats that do not support wrapping, for example `DELIMITED`, or when the value schema has multiple columns, results in an error. |


Expand Down
2 changes: 1 addition & 1 deletion docs/developer-guide/ksqldb-reference/create-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ following properties:
| REPLICAS | The number of replicas in the backing topic. If this property is not set but PARTITIONS is set, then the default Kafka cluster configuration for replicas will be used for creating a new topic. |
| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, defaults to ','. For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not an actual space or tab character. |
| TIMESTAMP | By default, the pseudo `ROWTIME` column is the timestamp of the record in the Kafka topic. The TIMESTAMP property can be used to override `ROWTIME` with the contents of the specified column within the Kafka message (similar to timestamp extractors in Kafka's Streams API). Timestamps have a millisecond accuracy. Time-based operations, such as windowing, will process a record according to the timestamp in `ROWTIME`. |
| TIMESTAMP_FORMAT | Used in conjunction with TIMESTAMP. If not set the timestamp column must be of type `bigint`. If it is set, then the TIMESTAMP column must be of type `varchar` and have a format that can be parsed with the java `DateTimeFormatter`. If your timestamp format has characters requiring single quotes, you can escape them with successive single quotes, `''`, for example: `'yyyy-MM-dd''T''HH:mm:ssX'`. For more information on timestamp formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). |
| TIMESTAMP_FORMAT | Used in conjunction with TIMESTAMP. If not set, ksqlDB timestamp column must be of type `bigint` or `timestamp`. If it is set, then the TIMESTAMP column must be of type `varchar` and have a format that can be parsed with the java `DateTimeFormatter`. If your timestamp format has characters requiring single quotes, you can escape them with successive single quotes, `''`, for example: `'yyyy-MM-dd''T''HH:mm:ssX'`. For more information on timestamp formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). |
| WRAP_SINGLE_VALUE | Controls how values are deserialized where the value schema contains only a single column. The setting controls how ksqlDB will deserialize the value of the records in the supplied `KAFKA_TOPIC` that contain only a single column.<br>If set to `true`, ksqlDB expects the column to have been serialized as a named column within a record.<br>If set to `false`, ksqlDB expects the column to have been serialized as an anonymous value.<br>If not supplied, the system default, defined by [ksql.persistence.wrap.single.values](/reference/server-configuration#ksqlpersistencewrapsinglevalues) and defaulting to `true`, is used.<br>**Note:** `null` values have special meaning in ksqlDB. Care should be taken when dealing with single-column schemas where the value can be `null`. For more information, see [Single column (un)wrapping](/reference/serialization#single-field-unwrapping).<br>**Note:** Supplying this property for formats that do not support wrapping, for example `DELIMITED`, or when the value schema has multiple columns, will result in an error. |
| WINDOW_TYPE | By default, the topic is assumed to contain non-windowed data. If the data is windowed, i.e., was created using ksqlDB using a query that contains a `WINDOW` clause, then the `WINDOW_TYPE` property can be used to provide the window type. Valid values are `SESSION`, `HOPPING`, and `TUMBLING`. |
| WINDOW_SIZE | By default, the topic is assumed to contain non-windowed data. If the data is windowed, i.e., was created using ksqlDB using a query that contains a `WINDOW` clause, and the `WINDOW_TYPE` property is TUMBLING or HOPPING, then the WINDOW_SIZE property should be set. The property is a string with two literals, window size (a number) and window size unit (a time unit). For example: `10 SECONDS`. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ following properties:
| PARTITIONS | The number of partitions in the backing topic. If this property is not set, then the number of partitions of the input stream/table will be used. In join queries, the property values are taken from the left-most stream or table. You can't change the number of partitions on a table. To change the partition count, you must drop the table and create it again. |
| REPLICAS | The replication factor for the topic. If this property is not set, then the number of replicas of the input stream or table will be used. In join queries, the property values are taken from the left-most stream or table. |
| TIMESTAMP | Sets a column within this stream's schema to be used as the default source of `ROWTIME` for any downstream queries. Downstream queries that use time-based operations, such as windowing, will process records in this stream based on the timestamp in this column. The column will be used to set the timestamp on any records emitted to Kafka. Timestamps have a millisecond accuracy. If not supplied, the `ROWTIME` of the source stream is used. <br>**Note**: This doesn't affect the processing of the query that populates this stream. For example, given the following statement:<br><pre>CREATE STREAM foo WITH (TIMESTAMP='t2') AS<br>&#0009;SELECT * FROM bar<br>&#0009;WINDOW TUMBLING (size 10 seconds);<br>&#0009;EMIT CHANGES;</pre>The window into which each row of `bar` is placed is determined by bar's `ROWTIME`, not `t2`. |
| TIMESTAMP_FORMAT | Used in conjunction with TIMESTAMP. If not set the timestamp column must be of type `bigint`. If it is set, then the TIMESTAMP column must be of type varchar and have a format that can be parsed with the Java `DateTimeFormatter`. If your timestamp format has characters requiring single quotes, you can escape them with two successive single quotes, `''`, for example: `'yyyy-MM-dd''T''HH:mm:ssX'`. For more information on timestamp formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). |
| TIMESTAMP_FORMAT | Used in conjunction with TIMESTAMP. If not set, ksqlDB timestamp column must be of type `bigint` or `timestamp`. If it is set, then the TIMESTAMP column must be of type varchar and have a format that can be parsed with the Java `DateTimeFormatter`. If your timestamp format has characters requiring single quotes, you can escape them with two successive single quotes, `''`, for example: `'yyyy-MM-dd''T''HH:mm:ssX'`. For more information on timestamp formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). |
| WRAP_SINGLE_VALUE | Controls how values are serialized where the values schema contains only a single column. The setting controls how the query will serialize values with a single-column schema.<br>If set to `true`, ksqlDB will serialize the column as a named column within a record.<br>If set to `false`, ksqlDB will serialize the column as an anonymous value.<br>If not supplied, the system default, defined by [ksql.persistence.wrap.single.values](/reference/server-configuration#ksqlpersistencewrapsinglevalues), then the format's default is used.<br>**Note:** `null` values have special meaning in ksqlDB. Care should be taken when dealing with single-column schemas where the value can be `null`. For more information, see [Single column (un)wrapping](/reference/serialization#single-field-unwrapping).<br>**Note:** Supplying this property for formats that do not support wrapping, for example `DELIMITED`, or when the value schema has multiple columns, will result in an error. |


Expand Down
2 changes: 1 addition & 1 deletion docs/developer-guide/ksqldb-reference/create-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ following properties:
| REPLICAS | The number of replicas in the backing topic. If this property is not set but PARTITIONS is set, then the default Kafka cluster configuration for replicas will be used for creating a new topic. |
| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, defaults to ','. For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not an actual space or tab character. |
| TIMESTAMP | By default, the pseudo `ROWTIME` column is the timestamp of the message in the Kafka topic. The TIMESTAMP property can be used to override `ROWTIME` with the contents of the specified column within the Kafka message (similar to timestamp extractors in Kafka's Streams API). Timestamps have a millisecond accuracy. Time-based operations, such as windowing, will process a record according to the timestamp in `ROWTIME`. |
| TIMESTAMP_FORMAT | Used in conjunction with TIMESTAMP. If not set the timestamp column must be of type `bigint`. If it is set, then the TIMESTAMP column must be of type `varchar` and have a format that can be parsed with the java `DateTimeFormatter`. If your timestamp format has characters requiring single quotes, you can escape them with successive single quotes, `''`, for example: `'yyyy-MM-dd''T''HH:mm:ssX'`. For more information on timestamp formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). |
| TIMESTAMP_FORMAT | Used in conjunction with TIMESTAMP. If not set, ksqlDB timestamp column must be of type `bigint` or `timestamp`. If it is set, then the TIMESTAMP column must be of type `varchar` and have a format that can be parsed with the java `DateTimeFormatter`. If your timestamp format has characters requiring single quotes, you can escape them with successive single quotes, `''`, for example: `'yyyy-MM-dd''T''HH:mm:ssX'`. For more information on timestamp formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). |
| WRAP_SINGLE_VALUE | Controls how values are deserialized where the values schema contains only a single column. The setting controls how ksqlDB will deserialize the value of the records in the supplied `KAFKA_TOPIC` that contain only a single column.<br>If set to `true`, ksqlDB expects the column to have been serialized as named column within a record.<br>If set to `false`, ksqlDB expects the column to have been serialized as an anonymous value.<br>If not supplied, the system default, defined by [ksql.persistence.wrap.single.values](/reference/server-configuration#ksqlpersistencewrapsinglevalues), then the format's default is used.<br>**Note:** `null` values have special meaning in ksqlDB. Care should be taken when dealing with single-column schemas where the value can be `null`. For more information, see [Single column (un)wrapping](/reference/serialization#single-field-unwrapping).<br>**Note:** Supplying this property for formats that do not support wrapping, for example `DELIMITED`, or when the value schema has multiple columns, will result in an error. |
| WINDOW_TYPE | By default, the topic is assumed to contain non-windowed data. If the data is windowed, i.e. was created using ksqlDB using a query that contains a `WINDOW` clause, then the `WINDOW_TYPE` property can be used to provide the window type. Valid values are `SESSION`, `HOPPING`, and `TUMBLING`. |
| WINDOW_SIZE | By default, the topic is assumed to contain non-windowed data. If the data is windowed, i.e., was created using ksqlDB using a query that contains a `WINDOW` clause, and the `WINDOW_TYPE` property is TUMBLING or HOPPING, then the WINDOW_SIZE property should be set. The property is a string with two literals, window size (a number) and window size unit (a time unit). For example: `10 SECONDS`. |
Expand Down
72 changes: 70 additions & 2 deletions docs/how-to-guides/use-a-custom-timestamp-column.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ FROM s1
EMIT CHANGES;
```

Your results should look similiar to what is below with the exception of `ROWTIME` and `ROWTIME_FORMATTED`, which will mirror your wall clock. Because you didn't yet instruct ksqlDB to use event-time, `ROWTIME` is inherited from the underlying Kafka record. Kafka's default is to set the timestamp at which the record was produced to the topic.
Your results should look similar to what is below with the exception of `ROWTIME` and `ROWTIME_FORMATTED`, which will mirror your wall clock. Because you didn't yet instruct ksqlDB to use event-time, `ROWTIME` is inherited from the underlying Kafka record. Kafka's default is to set the timestamp at which the record was produced to the topic.

```
+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
Expand Down Expand Up @@ -193,7 +193,7 @@ INSERT INTO s4 (
);
```

And run a similiar query as above. Remember to set `auto.offset.reset` to `earliest` if you haven't yet.
And run a similar query as above. Remember to set `auto.offset.reset` to `earliest` if you haven't yet.

```sql
SELECT k,
Expand All @@ -217,3 +217,71 @@ The query should return the following results.
|k2 |1588509000000 |1588509000000 |2020-05-03 12:30:00.000 |2020-05-03 12:30:00.000 |1 |b |
|k3 |1588736700000 |1588736700000 |2020-05-06 03:45:00.000 |2020-05-06 03:45:00.000 |2 |c |
```


## Timestamps represented by TIMESTAMP columns

Similar to the previous section, you can also use columns of type `TIMESTAMP`. These columns require data to be in `yyyy-mm-ddThh:mm:ss[.S]` format, so there is no need to provide the `timestamp_format` property.

Create a stream `s5` with a column `ts` of type `TIMESTAMP`.

```sql
CREATE STREAM s5 (
k VARCHAR KEY,
ts TIMESTAMP,
v1 INT,
v2 VARCHAR
) WITH (
kafka_topic = 's5',
partitions = 1,
value_format = 'avro',
timestamp = 'ts'
);
```

Insert some rows with timestamps.


```sql
INSERT INTO s5 (
k, ts, v1, v2
) VALUES (
'k1', '2019-07-09T01:00', 0, 'a'
);

INSERT INTO s5 (
k, ts, v1, v2
) VALUES (
'k2', '2020-05-03T12:30:00', 1, 'b'
);

INSERT INTO s5 (
k, ts, v1, v2
) VALUES (
'k3', '2020-05-06T03:45', 2, 'c'
);
```

And run the following query. Remember to set `auto.offset.reset` to `earliest` if you haven't yet.


```sql
SELECT k,
ROWTIME,
ts,
v1,
v2
FROM s5
EMIT CHANGES;
```

The query should return the following results.

```sql
+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+
|K |ROWTIME |TS |V1 |V2 |
+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+
|k1 |1562634000000 |2019-07-09T01:00:00.000 |0 |a |
|k2 |1588509000000 |2020-05-03T12:30:00.000 |1 |b |
|k3 |1588736700000 |2020-05-06T03:45:00.000 |2 |c |
```
6 changes: 3 additions & 3 deletions docs/reference/sql/time.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ the specified column. Define the format of a record's timestamp by
using the TIMESTAMP_FORMAT property.

If you use the TIMESTAMP property but don't set TIMESTAMP_FORMAT, ksqlDB
assumes that the timestamp field is a `bigint`. If you set
TIMESTAMP_FORMAT, the TIMESTAMP field must be of type `varchar` and
have a format that the `DateTimeFormatter` Java class can parse.
assumes that the timestamp field is either a `bigint` or a `timestamp`.
If you set TIMESTAMP_FORMAT, the TIMESTAMP field must be of type `varchar`
and have a format that the `DateTimeFormatter` Java class can parse.

If your timestamp format has embedded single quotes, you can escape them
by using two successive single quotes, `''`. For example, to escape
Expand Down
Loading

0 comments on commit ecb43e2

Please sign in to comment.