Skip to content

Commit

Permalink
feat: enhance PRINT TOPIC's format detection (#4551)
Browse files Browse the repository at this point in the history
* feat: enhance `PRINT TOPIC`'s format detection

fixes: #4258

With this change `PRINT TOPIC` has enhanced detection for the key and value formats of a topic.
The command starts with a list of known formats for both the key and the value and refines this
list as it sees more data.

As the list of possible formats is refined over time the command will output the reduced list.
For example, you may see output such as:

```
ksql> PRINT some_topic FROM BEGINNING;
Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 12/21/18 23:58:42 PM PSD, key: stream/CLICKSTREAM/create, value: {statement":"CREATE STREAM clickstream (_time bigint,time varchar, ip varchar, request varchar, status int, userid int, bytes bigint, agent varchar) with (kafka_topic = 'clickstream', value_format = 'json');","streamsProperties":{}}
rowtime: 12/21/18 23:58:42 PM PSD, key: table/EVENTS_PER_MIN/create, value: {"statement":"create table events_per_min as select userid, count(*) as events from clickstream window  TUMBLING (size 10 second) group by userid EMIT CHANGES;","streamsProperties":{}}
Key format: KAFKA_STRING
...
```

In the last line of the above output the command has narrowed the key format down as it has proceeded more data.

The command has also been updated to only detect valid UTF8 encoded text as type `JSON` or `KAFKA_STRING`.
This is inline with how KSQL would later deserialize the data.

If no known format can successfully deserialize the data it is printed as a combination of ASCII characters and hex encoded bytes.

(cherry picked from commit a3fae28)
  • Loading branch information
big-andy-coates committed Feb 20, 2020
1 parent fcd6c7f commit 8b19bc6
Show file tree
Hide file tree
Showing 17 changed files with 1,483 additions and 1,035 deletions.
11 changes: 9 additions & 2 deletions docs-md/developer-guide/create-a-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,8 @@ PRINT pageviews_intro;
Your output should resemble:

```
Key format: KAFKA (BIGINT or DOUBLE)
Value format: KAFKA (STRING)
Key format: KAFKA_BIGINT or KAFKA_DOUBLE
Value format: KAFKA_STRING
rowtime: 10/30/18 10:15:51 PM GMT, key: 294851, value: 1540937751186,User_8,Page_12
rowtime: 10/30/18 10:15:55 PM GMT, key: 295051, value: 1540937755255,User_1,Page_15
rowtime: 10/30/18 10:15:57 PM GMT, key: 295111, value: 1540937757265,User_8,Page_10
Expand All @@ -298,6 +298,13 @@ Press Ctrl+C to stop printing the stream.
!!! note
The query continues to run after you stop printing the stream.

!!! note
KsqlDB has determined that the key format is either `KAFKA_BIGINT` or `KAFKA_DOUBLE`.
KsqlDB has not narrowed it further because it is not possible to rule out
either format just by inspecting the key's serialized bytes. In this case we know the key is
a `BIGINT`. For other cases you may know the key type or you may need to speak to the author
of the data.

Use the SHOW QUERIES statement to view the query that ksqlDB created for
the `pageviews_intro` stream:

Expand Down
2 changes: 1 addition & 1 deletion docs-md/developer-guide/create-a-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ PRINT users_female;
Your output should resemble:

```
Key format: KAFKA (STRING)
Key format: KAFKA_STRING
Value format: JSON
rowTime: 12/21/18 23:58:42 PM PSD, key: User_5, value: {"USERID":"User_5","GENDER":"FEMALE","REGIONID":"Region_4"}
rowTime: 12/21/18 23:58:42 PM PSD, key: User_2, value: {"USERID":"User_2","GENDER":"FEMALE","REGIONID":"Region_7"}
Expand Down
10 changes: 8 additions & 2 deletions docs-md/developer-guide/ksqldb-reference/print.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,19 @@ the key and value formats at the top of the output.
Your output should resemble:

```
Key format: KAFKA (INTEGER)
Value format: JSON
Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 12/21/18 23:58:42 PM PSD, key: stream/CLICKSTREAM/create, value: {statement":"CREATE STREAM clickstream (_time bigint,time varchar, ip varchar, request varchar, status int, userid int, bytes bigint, agent varchar) with (kafka_topic = 'clickstream', value_format = 'json');","streamsProperties":{}}
rowtime: 12/21/18 23:58:42 PM PSD, key: table/EVENTS_PER_MIN/create, value: {"statement":"create table events_per_min as select userid, count(*) as events from clickstream window TUMBLING (size 10 second) group by userid EMIT CHANGES;","streamsProperties":{}}
^CTopic printing ceased
```

The key format for this topic is `KAFKA_STRING`. However, the `PRINT` command does not know this and
has attempted to determine the format of the key by inspecting the data. It has determined that the
format may be `KAFKA_STRING`, but it could also be `JSON` or a windowed `KAFKA_STRING`.

The value format for this topic is `JSON`. However, the `PRINT` command has also determined it could
be `KAFKA_STRING`. This is because `JSON` is serialized as text. Hence you could choose to deserialize
this value data as a `KAFKA_STRING` if you wanted to. However, `JSON` is likely the better option.

Page last revised on: {{ git_revision_date }}
4 changes: 2 additions & 2 deletions docs-md/developer-guide/query-with-structured-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ PRINT 'raw-topic' FROM BEGINNING;
Your output should resemble:

```json
Key format: KAFKA (STRING)
Value format: JSON
Key format: KAFKA_STRING
Value format: JSON OR KAFKA_STRING
rowtime: 12/21/18 23:58:42 PM PSD, key: 1, value: {"type":"key1","data":{"timestamp":"2018-12-21 23:58:42.1","field-a":1,"field-b":"first-value-for-key1"}}
rowtime: 12/21/18 23:58:42 PM PSD, key: 2, value: {"type":"key2","data":{"timestamp":"2018-12-21 23:58:42.2","field-a":1,"field-c":11,"field-d":"first-value-for-key2"}}
rowtime: 12/21/18 23:58:42 PM PSD, key: 3, value: {"type":"key1","data":{"timestamp":"2018-12-21 23:58:42.3","field-a":2,"field-b":"updated-value-for-key1"}}
Expand Down
13 changes: 10 additions & 3 deletions docs-md/tutorials/basics-docker.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ PRINT users;
Your output should resemble:

```json
Key format: KAFKA (STRING)
Key format: KAFKA_STRING
Value format: AVRO
rowtime: 10/30/18 10:15:51 PM GMT, key: User_1, value: {"registertime":1516754966866,"userid":"User_1","regionid":"Region_9","gender":"MALE"}
rowtime: 10/30/18 10:15:51 PM GMT, key: User_3, value: {"registertime":1491558386780,"userid":"User_3","regionid":"Region_2","gender":"MALE"}
Expand All @@ -204,8 +204,8 @@ PRINT pageviews;
Your output should resemble:

```
Key format: KAFKA (INTEGER)
Format: KAFKA (STRING)
Key format: KAFKA_BIGINT or KAFKA_DOUBLE
Format: KAFKA_STRING
rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243183, value: 1540254243183,User_9,Page_20
rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243617, value: 1540254243617,User_7,Page_47
rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243888, value: 1540254243888,User_4,Page_27
Expand All @@ -215,6 +215,13 @@ Your output should resemble:

Press Ctrl+C to stop printing messages.

!!! note
KsqlDB has determined that the key format is either `KAFKA_BIGINT` or `KAFKA_DOUBLE`.
KsqlDB has not narrowed it further because it is not possible to rule out
either format just by inspecting the key's serialized bytes. In this case we know the key is
a `BIGINT`. For other cases you may know the key type or you may need to speak to the author
of the data.

For more information, see [ksqlDB Syntax Reference](../developer-guide/syntax-reference.md).

Create a Stream and Table
Expand Down
13 changes: 10 additions & 3 deletions docs-md/tutorials/basics-local.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ PRINT users;
Your output should resemble:
```json
Key format: KAFKA (STRING)
Key format: KAFKA_STRING
Value format: AVRO
rowtime: 10/30/18 10:15:51 PM GMT, key: User_1, value: {"registertime":1516754966866,"userid":"User_1","regionid":"Region_9","gender":"MALE"}
rowtime: 10/30/18 10:15:51 PM GMT, key: User_3, value: {"registertime":1491558386780,"userid":"User_3","regionid":"Region_2","gender":"MALE"}
Expand All @@ -182,8 +182,8 @@ PRINT pageviews;
Your output should resemble:
```
Key format: KAFKA (INTEGER)
Format: KAFKA (STRING)
Key format: KAFKA_BIGINT or KAFKA_DOUBLE
Format: KAFKA_STRING
rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243183, value: 1540254243183,User_9,Page_20
rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243617, value: 1540254243617,User_7,Page_47
rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243888, value: 1540254243888,User_4,Page_27
Expand All @@ -193,6 +193,13 @@ Topic printing ceased
Press Ctrl+C to stop printing messages.
!!! note
KsqlDB has determined that the key format is either `KAFKA_BIGINT` or `KAFKA_DOUBLE`.
KsqlDB has not narrowed it further because it is not possible to rule out
either format just by inspecting the key's serialized bytes. In this case we know the key is
a `BIGINT`. For other cases you may know the key type or you may need to speak to the author
of the data.
For more information, see [KSQL Syntax Reference](../developer-guide/syntax-reference.md).
Create a Stream and Table
Expand Down
6 changes: 3 additions & 3 deletions docs/includes/ksql-includes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ Your output should resemble:

::

Key format: KAFKA (STRING)
Key format: KAFKA_STRING
Value format: AVRO
rowtime: 10/30/18 10:15:51 PM GMT, key: User_1, value: {"registertime":1516754966866,"userid":"User_1","regionid":"Region_9","gender":"MALE"}
rowtime: 10/30/18 10:15:51 PM GMT, key: User_3, value: {"registertime":1491558386780,"userid":"User_3","regionid":"Region_2","gender":"MALE"}
Expand All @@ -164,8 +164,8 @@ Your output should resemble:

::

Key format: KAFKA (INTEGER)
Format: KAFKA (STRING)
Key format: KAFKA_INTEGER
Format: KAFKA_STRING
rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243183, value: 1540254243183,User_9,Page_20
rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243617, value: 1540254243617,User_7,Page_47
rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243888, value: 1540254243888,User_4,Page_27
Expand Down
10 changes: 5 additions & 5 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,13 @@ public void shouldPrintTopicWithJsonValue() {

// Then:
assertThatEventually(() -> terminal.getOutputString(), containsString("Value format: JSON"));
assertThat(terminal.getOutputString(), containsString("Key format: KAFKA (BIGINT or DOUBLE)"));
assertThat(terminal.getOutputString(), containsString("Key format: KAFKA_BIGINT or KAFKA_DOUBLE"));
assertThat(terminal.getOutputString(), containsString(","
+ " key: 1, "
+ "value: {"
+ "\"ORDERTIME\":1,"
+ "\"ORDERID\":" + "\"ORDER_1\","
+ "\"ITEMID\":" + "\"ITEM_1\","
+ "\"ORDERID\":\"ORDER_1\","
+ "\"ITEMID\":\"ITEM_1\","
+ "\"ORDERUNITS\":10.0,"
+ "\"TIMESTAMP\":\"2018-01-01\","
+ "\"PRICEARRAY\":[100.0,110.99,90.0],"
Expand All @@ -422,8 +422,8 @@ public void shouldPrintTopicWithDelimitedValue() {
run("print " + DELIMITED_TOPIC + " FROM BEGINNING INTERVAL 1 LIMIT 2;", localCli);

// Then:
assertThatEventually(() -> terminal.getOutputString(), containsString("Value format: KAFKA (STRING)"));
assertThat(terminal.getOutputString(), containsString("Key format: KAFKA (STRING)"));
assertThatEventually(() -> terminal.getOutputString(), containsString("Value format: KAFKA_STRING"));
assertThat(terminal.getOutputString(), containsString("Key format: KAFKA_STRING"));
assertThat(terminal.getOutputString(), containsString(", key: <null>, value: <null>"));
assertThat(terminal.getOutputString(), containsString(", key: ITEM_1, value: ITEM_1,home cinema"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.rest.server.resources.streaming.Flow.Subscriber;
import io.confluent.ksql.rest.server.resources.streaming.TopicStream.RecordFormatter;
import io.confluent.ksql.services.ServiceContext;
import java.math.RoundingMode;
import java.time.Duration;
Expand Down
Loading

0 comments on commit 8b19bc6

Please sign in to comment.