From a31a8d4f3068bf8ab92c4fb574bda75353f8c30b Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Thu, 3 Oct 2019 13:34:03 -0700 Subject: [PATCH 1/2] docs: start updating docs to include `EMIT CHANGES` for push queries --- docs/includes/ksql-includes.rst | 155 +++++++++++++++++++------------- 1 file changed, 92 insertions(+), 63 deletions(-) diff --git a/docs/includes/ksql-includes.rst b/docs/includes/ksql-includes.rst index 5375d446fba2..26dd8538257b 100644 --- a/docs/includes/ksql-includes.rst +++ b/docs/includes/ksql-includes.rst @@ -138,7 +138,7 @@ Inspect the ``users`` topic by using the PRINT statement: .. code:: sql - PRINT 'users'; + PRINT users; Your output should resemble: @@ -157,7 +157,7 @@ Inspect the ``pageviews`` topic by using the PRINT statement: .. code:: sql - PRINT 'pageviews'; + PRINT pageviews LIMIT 3; Your output should resemble: @@ -167,12 +167,8 @@ Your output should resemble: 10/23/18 12:24:03 AM UTC , 9461 , 1540254243183,User_9,Page_20 10/23/18 12:24:03 AM UTC , 9471 , 1540254243617,User_7,Page_47 10/23/18 12:24:03 AM UTC , 9481 , 1540254243888,User_4,Page_27 - ^C10/23/18 12:24:05 AM UTC , 9521 , 1540254245161,User_9,Page_62 - Topic printing ceased ksql> -Press CTRL+C to stop printing messages. - For more information, see :ref:`ksql_syntax_reference`. .. inspect_topics_end @@ -259,16 +255,19 @@ the latest offset. .. code:: sql - SELECT pageid FROM pageviews_original LIMIT 3; + SELECT pageid FROM pageviews_original EMIT CHANGES LIMIT 3; Your output should resemble: :: - Page_24 - Page_73 - Page_78 - LIMIT reached + +-----------------+ + |PAGEID | + +-----------------+ + |Page_63 | + |Page_44 | + |Page_59 | + Limit Reached Query terminated #. Create a persistent query by using the ``CREATE STREAM`` keywords to precede the ``SELECT`` statement. The results from this @@ -281,16 +280,18 @@ the latest offset. SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original - ON pageviews_original.userid = users_original.userid; + ON pageviews_original.userid = users_original.userid + EMIT CHANGES; Your output should resemble: :: Message - ---------------------------- - Stream created and running - ---------------------------- + ---------------------------------------------------------------------------------------------------------- + Stream PAGEVIEWS_ENRICHED created and running. Created by query with query ID: CSAS_PAGEVIEWS_ENRICHED_4 + ---------------------------------------------------------------------------------------------------------- + .. tip:: You can run ``DESCRIBE pageviews_enriched;`` to describe the stream. @@ -299,15 +300,18 @@ the latest offset. .. code:: sql - SELECT * FROM pageviews_enriched; + SELECT * FROM pageviews_enriched EMIT CHANGES; Your output should resemble: :: - 1519746861328 | User_4 | User_4 | Page_58 | Region_5 | OTHER - 1519746861794 | User_9 | User_9 | Page_94 | Region_9 | MALE - 1519746862164 | User_1 | User_1 | Page_90 | Region_7 | FEMALE + +---------------------+-----------+--------+---------+----------+----------+ + |ROWTIME |ROWKEY |USERID |PAGEID |REGIONID |GENDER | + +---------------------+-----------+--------+---------+----------+----------+ + |1570059294609 |User_7 |User_7 |Page_80 |Region_3 |FEMALE | + |1570059294864 |User_7 |User_7 |Page_19 |Region_3 |FEMALE | + |1570059295004 |User_1 |User_1 |Page_27 |Region_7 |OTHER | ^CQuery terminated #. Create a new persistent query where a condition limits the streams content, using ``WHERE``. Results from this query @@ -317,16 +321,17 @@ the latest offset. CREATE STREAM pageviews_female AS SELECT * FROM pageviews_enriched - WHERE gender = 'FEMALE'; + WHERE gender = 'FEMALE' + EMIT CHANGES; Your output should resemble: :: Message - ---------------------------- - Stream created and running - ---------------------------- + ------------------------------------------------------------------------------------------------------ + Stream PAGEVIEWS_FEMALE created and running. Created by query with query ID: CSAS_PAGEVIEWS_FEMALE_5 + ------------------------------------------------------------------------------------------------------ .. tip:: You can run ``DESCRIBE pageviews_female;`` to describe the stream. @@ -338,62 +343,85 @@ the latest offset. CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9') AS SELECT * FROM pageviews_female - WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'; + WHERE regionid LIKE '%_8' OR regionid LIKE '%_9' + EMIT CHANGES; Your output should resemble: :: Message - ---------------------------- - Stream created and running - ---------------------------- + ---------------------------------------------------------------------------------------------------------------------- + Stream PAGEVIEWS_FEMALE_LIKE_89 created and running. Created by query with query ID: CSAS_PAGEVIEWS_FEMALE_LIKE_89_6 + ---------------------------------------------------------------------------------------------------------------------- -#. Create a new persistent query that counts the pageviews for each region and gender combination in a - :ref:`tumbling window ` of 30 seconds when the count is greater than one. Results from this query - are written to the ``PAGEVIEWS_REGIONS`` Kafka topic in the Avro format. KSQL will register the Avro schema with the - configured |sr| when it writes the first message to the ``PAGEVIEWS_REGIONS`` topic. +#. Create a new persistent query that counts the pageviews for each region in a :ref:`tumbling window ` + of 30 seconds when the count is greater than one. Results from this query are written to the ``PAGEVIEWS_REGIONS`` + Kafka topic in the Avro format. KSQL will register the Avro schema with the configured |sr| when it writes the first + message to the ``PAGEVIEWS_REGIONS`` topic. .. code:: sql CREATE TABLE pageviews_regions WITH (VALUE_FORMAT='avro') AS - SELECT gender, regionid , COUNT(*) AS numusers + SELECT regionid , COUNT(*) AS numusers FROM pageviews_enriched WINDOW TUMBLING (size 30 second) - GROUP BY gender, regionid - HAVING COUNT(*) > 1; + GROUP BY regionid + HAVING COUNT(*) > 1 + EMIT CHANGES; Your output should resemble: :: Message - --------------------------- - Table created and running - --------------------------- + ------------------------------------------------------------------------------------------------------- + Table PAGEVIEWS_REGIONS created and running. Created by query with query ID: CTAS_PAGEVIEWS_REGIONS_7 + ------------------------------------------------------------------------------------------------------- .. tip:: You can run ``DESCRIBE pageviews_regions;`` to describe the table. -#. Optional: View results from the above queries using ``SELECT``. +#. Optional: View the changes to the above table in realtime using ``SELECT``. .. code:: sql - SELECT gender, regionid, numusers FROM pageviews_regions LIMIT 5; + SELECT regionid, numusers FROM pageviews_regions EMIT CHANGES LIMIT 5; Your output should resemble: :: - FEMALE | Region_6 | 3 - FEMALE | Region_1 | 4 - FEMALE | Region_9 | 6 - MALE | Region_8 | 2 - OTHER | Region_5 | 4 + +-------------+-----------+ + |REGIONID |NUMUSERS | + +-------------+-----------+ + |Region_4 |2 | + |Region_1 |6 | + |Region_3 |3 | + |Region_5 |2 | + |Region_2 |2 | LIMIT reached Query terminated ksql> +#. Optional: Query the materialized table for the values stored within KSQL for a specific key: + +.. code:: sql + + SELECT * FROM pageviews_regions WHERE ROWKEY='Region_4'; + + Your output should resemble: + + :: + + ROWKEY STRING KEY | WINDOWSTART BIGINT KEY | REGIONID STRING | NUMUSERS BIGINT + -------------------------------------------------------------------------------- + Region_4 | 1570133790000 | Region_1 | 6 + Region_4 | 1570133820000 | Region_1 | 14 + Region_4 | 1570133850000 | Region_1 | 16 + Region_4 | 1570133880000 | Region_1 | 14 + Region_4 | 1570133910000 | Region_1 | 16 + #. Optional: Show all persistent queries. .. code:: sql @@ -406,10 +434,10 @@ the latest offset. Query ID | Kafka Topic | Query String -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - CSAS_PAGEVIEWS_FEMALE_1 | PAGEVIEWS_FEMALE | CREATE STREAM pageviews_female AS SELECT * FROM pageviews_enriched WHERE gender = 'FEMALE'; - CTAS_PAGEVIEWS_REGIONS_3 | PAGEVIEWS_REGIONS | CREATE TABLE pageviews_regions WITH (VALUE_FORMAT='avro') AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_enriched WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1; - CSAS_PAGEVIEWS_FEMALE_LIKE_89_2 | PAGEVIEWS_FEMALE_LIKE_89 | CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'; - CSAS_PAGEVIEWS_ENRICHED_0 | PAGEVIEWS_ENRICHED | CREATE STREAM pageviews_enriched AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid; + CSAS_PAGEVIEWS_FEMALE_1 | PAGEVIEWS_FEMALE | CREATE STREAM pageviews_female AS SELECT * FROM pageviews_enriched WHERE gender = 'FEMALE' EMIT CHANGES; + CTAS_PAGEVIEWS_REGIONS_3 | PAGEVIEWS_REGIONS | CREATE TABLE pageviews_regions WITH (VALUE_FORMAT='avro') AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_enriched WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1 EMIT CHANGES; + CSAS_PAGEVIEWS_FEMALE_LIKE_89_2 | PAGEVIEWS_FEMALE_LIKE_89 | CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9' EMIT CHANGES; + CSAS_PAGEVIEWS_ENRICHED_0 | PAGEVIEWS_ENRICHED | CREATE STREAM pageviews_enriched AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid EMIT CHANGES; -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- For detailed information on a Query run: EXPLAIN ; @@ -426,24 +454,23 @@ the latest offset. Name : PAGEVIEWS_REGIONS Type : TABLE - Key field : KSQL_INTERNAL_COL_0|+|KSQL_INTERNAL_COL_1 + Key field : REGIONID Key format : STRING Timestamp field : Not set - using Value format : AVRO - Kafka topic : PAGEVIEWS_REGIONS (partitions: 4, replication: 1) + Kafka topic : PAGEVIEWS_REGIONS (partitions: 1, replication: 1) Field | Type -------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) - GENDER | VARCHAR(STRING) REGIONID | VARCHAR(STRING) NUMUSERS | BIGINT -------------------------------------- Queries that write into this TABLE ----------------------------------- - CTAS_PAGEVIEWS_REGIONS_3 : CREATE TABLE pageviews_regions WITH (value_format='avro') AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_enriched WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1; + CTAS_PAGEVIEWS_REGIONS_3 : CREATE TABLE pageviews_regions WITH (value_format='avro') AS SELECT regionid, COUNT(*) AS numusers FROM pageviews_enriched WINDOW TUMBLING (size 30 second) GROUP BY regionid HAVING COUNT(*) > 1 EMIT CHANGES; For query topology and execution plan please run: EXPLAIN @@ -576,7 +603,7 @@ Query the data, using ``->`` notation to access the Struct contents: .. code:: sql - SELECT ORDERID, ADDRESS->CITY FROM ORDERS; + SELECT ORDERID, ADDRESS->CITY FROM ORDERS EMIT CHANGES; Your output should resemble: @@ -641,7 +668,7 @@ For the ``NEW_ORDERS`` topic, run: .. code:: sql - SELECT ORDER_ID, TOTAL_AMOUNT, CUSTOMER_NAME FROM NEW_ORDERS LIMIT 3; + SELECT ORDER_ID, TOTAL_AMOUNT, CUSTOMER_NAME FROM NEW_ORDERS EMIT CHANGES LIMIT 3; Your output should resemble: @@ -655,7 +682,7 @@ For the ``SHIPMENTS`` topic, run: .. code:: sql - SELECT ORDER_ID, SHIPMENT_ID, WAREHOUSE FROM SHIPMENTS LIMIT 2; + SELECT ORDER_ID, SHIPMENT_ID, WAREHOUSE FROM SHIPMENTS EMIT CHANGES LIMIT 2; Your output should resemble: @@ -674,7 +701,8 @@ based on a join window of 1 hour. FROM NEW_ORDERS O INNER JOIN SHIPMENTS S WITHIN 1 HOURS - ON O.ORDER_ID = S.ORDER_ID; + ON O.ORDER_ID = S.ORDER_ID + EMIT CHANGES; Your output should resemble: @@ -750,7 +778,7 @@ Inspect the WAREHOUSE_LOCATION table: .. code:: sql - SELECT ROWKEY, WAREHOUSE_ID FROM WAREHOUSE_LOCATION LIMIT 3; + SELECT ROWKEY, WAREHOUSE_ID FROM WAREHOUSE_LOCATION EMIT CHANGES LIMIT 3; Your output should resemble: @@ -766,7 +794,7 @@ Inspect the WAREHOUSE_SIZE table: .. code:: sql - SELECT ROWKEY, WAREHOUSE_ID FROM WAREHOUSE_SIZE LIMIT 3; + SELECT ROWKEY, WAREHOUSE_ID FROM WAREHOUSE_SIZE EMIT CHANGES LIMIT 3; Your output should resemble: @@ -786,6 +814,7 @@ Now join the two tables: FROM WAREHOUSE_LOCATION WL LEFT JOIN WAREHOUSE_SIZE WS ON WL.WAREHOUSE_ID=WS.WAREHOUSE_ID + EMIT CHANGES LIMIT 3; Your output should resemble: @@ -842,7 +871,7 @@ as part of the ``SELECT``: .. code:: sql - CREATE STREAM ALL_ORDERS AS SELECT 'LOCAL' AS SRC, * FROM ORDERS_SRC_LOCAL; + CREATE STREAM ALL_ORDERS AS SELECT 'LOCAL' AS SRC, * FROM ORDERS_SRC_LOCAL EMIT CHANGES; Your output should resemble: @@ -882,7 +911,7 @@ Add stream of 3rd party orders into the existing output stream: .. code:: sql - INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY; + INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY EMIT CHANGES; Your output should resemble: @@ -899,7 +928,7 @@ written to it: .. code:: sql - SELECT * FROM ALL_ORDERS; + SELECT * FROM ALL_ORDERS EMIT CHANGES; Your output should resemble the following. Note that there are messages from both source topics (denoted by ``LOCAL`` and ``3RD PARTY`` respectively). @@ -929,8 +958,8 @@ Your output should resemble: Query ID | Kafka Topic | Query String ------------------------------------------------------------------------------------------------------------------- - CSAS_ALL_ORDERS_0 | ALL_ORDERS | CREATE STREAM ALL_ORDERS AS SELECT 'LOCAL' AS SRC, * FROM ORDERS_SRC_LOCAL; - InsertQuery_1 | ALL_ORDERS | INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY; + CSAS_ALL_ORDERS_0 | ALL_ORDERS | CREATE STREAM ALL_ORDERS AS SELECT 'LOCAL' AS SRC, * FROM ORDERS_SRC_LOCAL EMIT CHANGES; + InsertQuery_1 | ALL_ORDERS | INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY EMIT CHANGES; ------------------------------------------------------------------------------------------------------------------- .. insert-into_02_end From 0da9f401d8b26590f27f199f644c141202e30899 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Thu, 3 Oct 2019 14:35:55 -0700 Subject: [PATCH 2/2] docs: more doc changes for push / pull queries --- docs/capacity-planning.rst | 15 +- docs/concepts/ksql-and-kafka-streams.rst | 3 +- docs/concepts/ksql-architecture.rst | 3 +- .../time-and-windows-in-ksql-queries.rst | 15 +- .../aggregate-streaming-data.rst | 15 +- docs/developer-guide/api.rst | 18 +- docs/developer-guide/create-a-stream.rst | 11 +- docs/developer-guide/create-a-table.rst | 30 +++- docs/developer-guide/implement-a-udf.rst | 2 +- .../join-streams-and-tables.rst | 6 +- docs/developer-guide/ksql-testing-tool.rst | 4 +- docs/developer-guide/partition-data.rst | 2 +- .../query-with-arrays-and-maps.rst | 9 +- .../query-with-structured-data.rst | 10 +- docs/developer-guide/serialization.rst | 14 +- docs/developer-guide/syntax-reference.rst | 115 +++++++++---- .../transform-a-stream-with-ksql.rst | 9 +- docs/developer-guide/udf.rst | 2 +- docs/includes/csas-snippet.sql | 3 +- docs/includes/ctas-snippet.sql | 3 +- docs/includes/ksql-includes.rst | 155 +++++++----------- .../server-config/avro-schema.rst | 3 +- .../server-config/config-reference.rst | 2 +- docs/operations.rst | 2 +- docs/troubleshoot-ksql.rst | 8 +- docs/tutorials/basics-control-center.rst | 8 +- docs/tutorials/clickstream-docker.rst | 6 +- docs/tutorials/examples.rst | 38 +++-- docs/tutorials/examples.sql | 33 ++-- docs/tutorials/generate-custom-test-data.rst | 2 +- 30 files changed, 324 insertions(+), 222 deletions(-) diff --git a/docs/capacity-planning.rst b/docs/capacity-planning.rst index 8b1352f57567..ac373487bdb9 100644 --- a/docs/capacity-planning.rst +++ b/docs/capacity-planning.rst @@ -82,7 +82,7 @@ Some queries require that the input stream be repartitioned so that all messages .. code:: sql - CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid; + CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid EMIT CHANGES; DESCRIBE EXTENDED pageviews_by_page; Your output should resemble: @@ -92,7 +92,7 @@ Some queries require that the input stream be repartitioned so that all messages ... Queries that write into this TABLE ----------------------------------- - id:CTAS_PAGEVIEWS_BY_PAGE - CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid; + id:CTAS_PAGEVIEWS_BY_PAGE - CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid EMIT CHANGES; For query topology and execution plan please run: EXPLAIN @@ -107,7 +107,7 @@ Some queries require that the input stream be repartitioned so that all messages :: Type : QUERY - SQL : CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid; + SQL : CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid EMIT CHANGES; Execution plan -------------- @@ -313,7 +313,8 @@ out all the views that lasted less than 10 seconds: WITH (PARTITIONS=64) AS SELECT * FROM pageviews_original - WHERE duration > 10; + WHERE duration > 10 + EMIT CHANGES; KSQL ++++ @@ -354,13 +355,15 @@ and then count up views by city: CREATE STREAM pageviews_meaningful_with_user_info WITH (PARTITIONS=64) AS SELECT pv.viewtime, pv.userid, pv.pageid, pv.client_ip, pv.url, pv.duration, pv.from_url, u.city, u.country, u.gender, u.email - FROM pageviews_meaningful pv LEFT JOIN users u ON pv.userid = u.userid; + FROM pageviews_meaningful pv LEFT JOIN users u ON pv.userid = u.userid + EMIT CHANGES; CREATE TABLE pageview_counts_by_city WITH (PARTITIONS=64) AS SELECT country, city, count(*) FROM pageviews_meaningful_with_user_info - GROUP BY country, city; + GROUP BY country, city + EMIT CHANGES; KSQL ++++ diff --git a/docs/concepts/ksql-and-kafka-streams.rst b/docs/concepts/ksql-and-kafka-streams.rst index a2ef170aa6e1..f3134dc1bfa9 100644 --- a/docs/concepts/ksql-and-kafka-streams.rst +++ b/docs/concepts/ksql-and-kafka-streams.rst @@ -28,7 +28,8 @@ For example, to implement simple fraud-detection logic on a Kafka topic named CREATE STREAM fraudulent_payments AS SELECT fraudProbability(data) FROM payments - WHERE fraudProbability(data) > 0.8; + WHERE fraudProbability(data) > 0.8 + EMIT CHANGES; The equivalent Java code on Kafka Streams might resemble: diff --git a/docs/concepts/ksql-architecture.rst b/docs/concepts/ksql-architecture.rst index d174947acdaa..262e8b1b5250 100644 --- a/docs/concepts/ksql-architecture.rst +++ b/docs/concepts/ksql-architecture.rst @@ -362,7 +362,8 @@ from the ``authorization_attempts`` stream: WINDOW TUMBLING (SIZE 5 SECONDS) WHERE region = ‘west’ GROUP BY card_number - HAVING count(*) > 3; + HAVING count(*) > 3 + EMIT CHANGES; The KSQL engine translates the DML statement into a Kafka Streams application. The application reads the source topic continuously, and whenever the diff --git a/docs/concepts/time-and-windows-in-ksql-queries.rst b/docs/concepts/time-and-windows-in-ksql-queries.rst index 1fe2275e0397..3c7ef88cac86 100644 --- a/docs/concepts/time-and-windows-in-ksql-queries.rst +++ b/docs/concepts/time-and-windows-in-ksql-queries.rst @@ -240,7 +240,8 @@ run a query like this: SELECT regionid, COUNT(*) FROM pageviews WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS) WHERE UCASE(gender)='FEMALE' AND LCASE (regionid) LIKE '%_6' - GROUP BY regionid; + GROUP BY regionid + EMIT CHANGES; The hopping window's start time is inclusive, but the end time is exclusive. This is important for non-overlapping windows, in which each record must be @@ -270,7 +271,8 @@ per zip code per hour in an ``orders`` stream, you might run a query like this: .. code:: sql SELECT orderzip_code, TOPK(order_total, 5) FROM orders - WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY order_zipcode; + WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY order_zipcode + EMIT CHANGES; Here's another example: to detect potential credit card fraud in an ``authorization_attempts`` stream, you might run a query for the number of @@ -281,7 +283,8 @@ a time interval of five seconds. SELECT card_number, count(*) FROM authorization_attempts WINDOW TUMBLING (SIZE 5 SECONDS) - GROUP BY card_number HAVING COUNT(*) > 3; + GROUP BY card_number HAVING COUNT(*) > 3 + EMIT CHANGES; The tumbling window's start time is inclusive, but the end time is exclusive. This is important for non-overlapping windows, in which each record must be @@ -325,7 +328,8 @@ per region: SELECT regionid, COUNT(*) FROM pageviews WINDOW SESSION (60 SECONDS) - GROUP BY regionid; + GROUP BY regionid + EMIT CHANGES; The start and end times for a session window are both inclusive, in contrast to time windows. @@ -356,7 +360,8 @@ For example, to find orders that have shipped within the last hour from an FROM new_orders o INNER JOIN shipments s WITHIN 1 HOURS - ON o.order_id = s.order_id; + ON o.order_id = s.order_id + EMIT CHANGES; For more information on joins, see :ref:`join-streams-and-tables`. diff --git a/docs/developer-guide/aggregate-streaming-data.rst b/docs/developer-guide/aggregate-streaming-data.rst index 23c5bf32a504..07831dc3d5d1 100644 --- a/docs/developer-guide/aggregate-streaming-data.rst +++ b/docs/developer-guide/aggregate-streaming-data.rst @@ -31,7 +31,8 @@ because the result of the query is a KSQL table. SELECT regionid, COUNT(*) FROM pageviews - GROUP BY regionid; + GROUP BY regionid + EMIT CHANGES; Tombstone Records ================= @@ -63,7 +64,8 @@ This query computes the pageview count per region per minute: COUNT(*) FROM pageviews WINDOW TUMBLING (SIZE 1 MINUTE) - GROUP BY regionid; + GROUP BY regionid + EMIT CHANGES; To count the pageviews for “Region_6” by female users every 30 seconds, you can change the previous query to the following: @@ -76,7 +78,8 @@ To count the pageviews for “Region_6” by female users every FROM pageviews WINDOW TUMBLING (SIZE 30 SECONDS) WHERE UCASE(gender)='FEMALE' AND LCASE(regionid)='region_6' - GROUP BY regionid; + GROUP BY regionid + EMIT CHANGES; Aggregate Records Over a Hopping Window ======================================= @@ -97,7 +100,8 @@ and substring matching. FROM pageviews WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS) WHERE UCASE(gender)='FEMALE' AND LCASE (regionid) LIKE '%_6' - GROUP BY regionid; + GROUP BY regionid + EMIT CHANGES; Aggregate Records Over a Session Window ======================================= @@ -113,7 +117,8 @@ the input data and performs the counting step per region. COUNT(*) FROM pageviews WINDOW SESSION (60 SECONDS) - GROUP BY regionid; + GROUP BY regionid + EMIT CHANGES; For more information, see :ref:`time-and-windows-in-ksql-queries`. diff --git a/docs/developer-guide/api.rst b/docs/developer-guide/api.rst index e614a2fce151..802329b9972b 100644 --- a/docs/developer-guide/api.rst +++ b/docs/developer-guide/api.rst @@ -47,7 +47,7 @@ Here's an example request that retrieves streaming data from ``TEST_STREAM``: curl -X "POST" "http://localhost:8088/query" \ -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \ -d $'{ - "ksql": "SELECT * FROM TEST_STREAM;", + "ksql": "SELECT * FROM TEST_STREAM EMIT CHANGES;", "streamsProperties": {} }' @@ -217,7 +217,7 @@ statements use the ``/query`` endpoint. Content-Type: application/vnd.ksql.v1+json { - "ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home'; CREATE STREAM pageviews_alice AS SELECT * FROM pageviews_original WHERE userid='alice';", + "ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home'; CREATE STREAM pageviews_alice AS SELECT * FROM pageviews_original WHERE userid='alice' EMIT CHANGES;", "streamsProperties": { "ksql.streams.auto.offset.reset": "earliest" } @@ -232,7 +232,7 @@ statements use the ``/query`` endpoint. [ { - "statementText":"CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home';", + "statementText":"CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home' EMIT CHANGES;", "commandId":"stream/PAGEVIEWS_HOME/create", "commandStatus": { "status":"SUCCESS", @@ -241,7 +241,7 @@ statements use the ``/query`` endpoint. "commandSequenceNumber":10 }, { - "statementText":"CREATE STREAM pageviews_alice AS SELECT * FROM pageviews_original WHERE userid='alice';", + "statementText":"CREATE STREAM pageviews_alice AS SELECT * FROM pageviews_original WHERE userid='alice' EMIT CHANGES;", "commandId":"stream/PAGEVIEWS_ALICE/create", "commandStatus": { "status":"SUCCESS", @@ -266,7 +266,7 @@ similar to the example request above: Content-Type: application/vnd.ksql.v1+json { - "ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home'; CREATE TABLE pageviews_home_count AS SELECT userid, COUNT(*) FROM pageviews_home GROUP BY userid;" + "ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home'; CREATE TABLE pageviews_home_count AS SELECT userid, COUNT(*) FROM pageviews_home GROUP BY userid EMIT CHANGES;" } The second method is to submit the statements as separate requests and incorporate the interdependency by using ``commandSequenceNumber``. @@ -279,7 +279,7 @@ Send the first request: Content-Type: application/vnd.ksql.v1+json { - "ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home';" + "ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home' EMIT CHANGES;" } Make note of the ``commandSequenceNumber`` returned in the response: @@ -291,7 +291,7 @@ Make note of the ``commandSequenceNumber`` returned in the response: [ { - "statementText":"CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home';", + "statementText":"CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home' EMIT CHANGES;", "commandId":"stream/PAGEVIEWS_HOME/create", "commandStatus": { "status":"SUCCESS", @@ -311,7 +311,7 @@ execute until after command number 10 has finished executing: Content-Type: application/vnd.ksql.v1+json { - "ksql": "CREATE TABLE pageviews_home_count AS SELECT userid, COUNT(*) FROM pageviews_home GROUP BY userid;", + "ksql": "CREATE TABLE pageviews_home_count AS SELECT userid, COUNT(*) FROM pageviews_home GROUP BY userid EMIT CHANGES;", "commandSequenceNumber":10 } @@ -346,7 +346,7 @@ The ``/query`` resource lets you stream the output records of a ``SELECT`` state Content-Type: application/vnd.ksql.v1+json { - "ksql": "SELECT * FROM pageviews;", + "ksql": "SELECT * FROM pageviews EMIT CHANGES;", "streamsProperties": { "ksql.streams.auto.offset.reset": "earliest" } diff --git a/docs/developer-guide/create-a-stream.rst b/docs/developer-guide/create-a-stream.rst index 091bb6f1c3d7..75eb5cf22f2d 100644 --- a/docs/developer-guide/create-a-stream.rst +++ b/docs/developer-guide/create-a-stream.rst @@ -45,7 +45,8 @@ In the KSQL CLI, paste the following CREATE STREAM statement: userid VARCHAR, pageid VARCHAR) WITH (KAFKA_TOPIC='pageviews', - VALUE_FORMAT='DELIMITED'); + VALUE_FORMAT='DELIMITED') + EMIT CHANGES; Your output should resemble: @@ -155,7 +156,8 @@ like this: WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='DELIMITED', KEY='pageid', - TIMESTAMP='viewtime'); + TIMESTAMP='viewtime') + EMIT CHANGES; Confirm that the TIMESTAMP field is ``viewtime`` by using the DESCRIBE EXTENDED statement: @@ -223,7 +225,8 @@ results from a persistent query that matches "introductory" pages that have a CREATE STREAM pageviews_intro AS SELECT * FROM pageviews - WHERE pageid < 'Page_20'; + WHERE pageid < 'Page_20' + EMIT CHANGES; Your output should resemble: @@ -273,7 +276,7 @@ Your output should resemble: Query ID | Kafka Topic | Query String -------------------------------------------------------------------------------------------------------------------------------------------- - CSAS_PAGEVIEWS_INTRO_0 | PAGEVIEWS_INTRO | CREATE STREAM pageviews_intro AS SELECT * FROM pageviews WHERE pageid < 'Page_20'; + CSAS_PAGEVIEWS_INTRO_0 | PAGEVIEWS_INTRO | CREATE STREAM pageviews_intro AS SELECT * FROM pageviews WHERE pageid < 'Page_20' EMIT CHANGES; -------------------------------------------------------------------------------------------------------------------------------------------- For detailed information on a Query run: EXPLAIN ; diff --git a/docs/developer-guide/create-a-table.rst b/docs/developer-guide/create-a-table.rst index fd8d0c0cc207..3817100d74e1 100644 --- a/docs/developer-guide/create-a-table.rst +++ b/docs/developer-guide/create-a-table.rst @@ -106,7 +106,7 @@ statement: .. code:: sql - SELECT * FROM users; + SELECT * FROM users EMIT CHANGES; Your output should resemble: @@ -144,7 +144,8 @@ results from a persistent query for users that have ``gender`` set to ``FEMALE`` CREATE TABLE users_female AS SELECT userid, gender, regionid FROM users - WHERE gender='FEMALE'; + WHERE gender='FEMALE' + EMIT CHANGES; Your output should resemble: @@ -206,7 +207,7 @@ Your output should resemble: Query ID | Kafka Topic | Query String ----------------------------------------------------------------------------------------------------------------------------------------- - CTAS_USERS_FEMALE_0 | USERS_FEMALE | CREATE TABLE users_female AS SELECT userid, gender, regionid FROM users WHERE gender='FEMALE'; + CTAS_USERS_FEMALE_0 | USERS_FEMALE | CREATE TABLE users_female AS SELECT userid, gender, regionid FROM users WHERE gender='FEMALE' EMIT CHANGES; ----------------------------------------------------------------------------------------------------------------------------------------- For detailed information on a Query run: EXPLAIN ; @@ -225,7 +226,8 @@ function like COUNT(*) in the SELECT clause. CREATE TABLE pageviews_table AS SELECT viewtime, userid, pageid, COUNT(*) AS TOTAL FROM pageviews_original WINDOW TUMBLING (SIZE 1 MINUTES) - GROUP BY viewtime, userid, pageid; + GROUP BY viewtime, userid, pageid + EMIT CHANGES; Your output should resemble: @@ -237,11 +239,11 @@ Your output should resemble: --------------------------- ksql> -Inspect the table by using a SELECT statement. +Observe the changes happening to the table using a streaming SELECT statement. .. code:: sql - SELECT * FROM pageviews_table; + SELECT * FROM pageviews_table EMIT CHANGES; Your output should resemble: @@ -257,6 +259,22 @@ Your output should resemble: ^CQuery terminated ksql> +Lookup the value for a specific key within the table using a SELECT statement. + +.. code:: sql + + SELECT * FROM pageviews_table WHERE ROWKEY='1557183929488|+|User_9|+|Page_39'; + +Your output should resemble: + +:: + + ROWKEY STRING KEY | WINDOWSTART BIGINT KEY | VIEWTIME BIGINT | USERID STRING | PAGEID STRING | TOTAL BIGINT + ---------------------------------------------------------------------------------------------------------------------------- + 1557183929488|+|User_9|+|Page_39 | 1557183900000 | 1557183929488 | User_9 | Page_39 | 1 + ---------------------------------------------------------------------------------------------------------------------------- + ksql> + Delete a KSQL Table ******************* diff --git a/docs/developer-guide/implement-a-udf.rst b/docs/developer-guide/implement-a-udf.rst index 5a6d5130f9cf..14b497bad79b 100644 --- a/docs/developer-guide/implement-a-udf.rst +++ b/docs/developer-guide/implement-a-udf.rst @@ -362,7 +362,7 @@ Use the MULTIPLY function in a query. If you follow the steps in :: - SELECT MULTIPLY(rowtime, viewtime) FROM pageviews_original; + SELECT MULTIPLY(rowtime, viewtime) FROM pageviews_original EMIT CHANGES; Your output should resemble: diff --git a/docs/developer-guide/join-streams-and-tables.rst b/docs/developer-guide/join-streams-and-tables.rst index 92a715052cea..5ce13d8a8245 100644 --- a/docs/developer-guide/join-streams-and-tables.rst +++ b/docs/developer-guide/join-streams-and-tables.rst @@ -30,7 +30,8 @@ combination of a ``pageviews`` stream and a ``users`` table: CREATE STREAM pageviews_enriched AS SELECT users.userid AS userid, pageid, regionid, gender FROM pageviews - LEFT JOIN users ON pageviews.userid = users.userid; + LEFT JOIN users ON pageviews.userid = users.userid + EMIT CHANGES; For the full code example, see :ref:`ksql_quickstart-docker`. @@ -51,7 +52,8 @@ expected time of two hours. CREATE STREAM late_orders AS SELECT o.orderid, o.itemid FROM orders o FULL OUTER JOIN shipments s WITHIN 2 HOURS - ON s.orderid = o.orderid WHERE s.orderid IS NULL; + ON s.orderid = o.orderid WHERE s.orderid IS NULL + EMIT CHANGES; Joins and Windows ***************** diff --git a/docs/developer-guide/ksql-testing-tool.rst b/docs/developer-guide/ksql-testing-tool.rst index ec13039ad0ac..6f395e4d50f3 100644 --- a/docs/developer-guide/ksql-testing-tool.rst +++ b/docs/developer-guide/ksql-testing-tool.rst @@ -58,7 +58,7 @@ Here is a sample statements file for the testing tool: .. code:: sql CREATE STREAM orders (ORDERUNITS double) WITH (kafka_topic='test_topic', value_format='JSON'); - CREATE STREAM S1 AS SELECT ORDERUNITS, CASE WHEN orderunits < 2.0 THEN 'small' WHEN orderunits < 4.0 THEN 'medium' ELSE 'large' END AS case_resault FROM orders; + CREATE STREAM S1 AS SELECT ORDERUNITS, CASE WHEN orderunits < 2.0 THEN 'small' WHEN orderunits < 4.0 THEN 'medium' ELSE 'large' END AS case_resault FROM orders EMIT CHANGES; Input File ---------- @@ -186,7 +186,7 @@ are submitted later can affect the output of a query. For example, consider the INSERT INTO orders VALUES(10.0); INSERT INTO orders VALUES(15.0); INSERT INTO orders VALUES(20.0); - CREATE STREAM S1 AS SELECT ORDERUNITS, CASE WHEN orderunits < 2.0 THEN 'small' WHEN orderunits < 4.0 THEN 'medium' ELSE 'large' END AS case_resault FROM orders; + CREATE STREAM S1 AS SELECT ORDERUNITS, CASE WHEN orderunits < 2.0 THEN 'small' WHEN orderunits < 4.0 THEN 'medium' ELSE 'large' END AS case_resault FROM orders EMIT CHANGES; INSERT INTO orders VALUES(25.0); INSERT INTO orders VALUES(30.0); diff --git a/docs/developer-guide/partition-data.rst b/docs/developer-guide/partition-data.rst index 472d407400d4..3c4f3ae8fe13 100644 --- a/docs/developer-guide/partition-data.rst +++ b/docs/developer-guide/partition-data.rst @@ -111,7 +111,7 @@ use the following KSQL statement: .. code:: sql - CREATE STREAM products_rekeyed WITH (PARTITIONS=6) AS SELECT * FROM products PARTITION BY product_id; + CREATE STREAM products_rekeyed WITH (PARTITIONS=6) AS SELECT * FROM products PARTITION BY product_id EMIT CHANGES; For more information, see `Inspecting and Changing Topic Keys `__ in the `Stream Processing Cookbook `__. diff --git a/docs/developer-guide/query-with-arrays-and-maps.rst b/docs/developer-guide/query-with-arrays-and-maps.rst index dfffad6e638a..83c69ba90065 100644 --- a/docs/developer-guide/query-with-arrays-and-maps.rst +++ b/docs/developer-guide/query-with-arrays-and-maps.rst @@ -69,7 +69,8 @@ and the user's city and zip code from the ``contactinfo`` map. userid, gender, regionid - FROM users; + FROM users + EMIT CHANGES; Your output should resemble: @@ -85,7 +86,8 @@ Run the following SELECT query to view the table: .. code:: sql SELECT userid, first_interest, city, zipcode - FROM users_interest_and_contactinfo; + FROM users_interest_and_contactinfo + EMIT CHANGES; Your output should resemble: @@ -113,7 +115,8 @@ For example, to get the user's last interest run the following SELECT statement: userid, gender, regionid - FROM users_extended; + FROM users_extended + EMIT CHANGES; Your output should resemble: diff --git a/docs/developer-guide/query-with-structured-data.rst b/docs/developer-guide/query-with-structured-data.rst index 33d7cb597a8a..769760d6f55a 100644 --- a/docs/developer-guide/query-with-structured-data.rst +++ b/docs/developer-guide/query-with-structured-data.rst @@ -183,7 +183,7 @@ Run a SELECT query to inspect the ``T`` stream: .. code:: sql - SELECT * FROM T; + SELECT * FROM T EMIT CHANGES; Your output should resemble: @@ -208,7 +208,7 @@ nested elements: .. code:: sql - SELECT DATA->"field-a", DATA->"field-b" FROM T WHERE TYPE='key1' LIMIT 2; + SELECT DATA->"field-a", DATA->"field-b" FROM T WHERE TYPE='key1' EMIT CHANGES LIMIT 2; Your output should resemble: @@ -223,7 +223,7 @@ Query the other nested elements: .. code:: sql - SELECT DATA->"field-a", DATA->"field-c", DATA->"field-d" FROM T WHERE TYPE='key2' LIMIT 2; + SELECT DATA->"field-a", DATA->"field-c", DATA->"field-d" FROM T WHERE TYPE='key2' EMIT CHANGES LIMIT 2; Your output should resemble: @@ -240,11 +240,11 @@ new streams. .. code:: sql - CREATE STREAM TYPE_1 AS SELECT DATA->"field-a", DATA->"field-b" FROM T WHERE TYPE='key1'; + CREATE STREAM TYPE_1 AS SELECT DATA->"field-a", DATA->"field-b" FROM T WHERE TYPE='key1' EMIT CHANGES; .. code:: sql - CREATE STREAM TYPE_2 AS SELECT DATA->"field-a", DATA->"field-c",DATA->"field-d" FROM T2 WHERE TYPE='key2'; + CREATE STREAM TYPE_2 AS SELECT DATA->"field-a", DATA->"field-c",DATA->"field-d" FROM T2 WHERE TYPE='key2' EMIT CHANGES; For both statements, your output should resemble: diff --git a/docs/developer-guide/serialization.rst b/docs/developer-guide/serialization.rst index 9dee9c866d58..9afc3b44bda9 100644 --- a/docs/developer-guide/serialization.rst +++ b/docs/developer-guide/serialization.rst @@ -109,7 +109,7 @@ For example, consider the statements: .. code:: sql CREATE STREAM x (f0 INT, f1 STRING) WITH (VALUE_FORMAT='JSON', ...); - CREATE STREAM y AS SELECT f0 FROM x; + CREATE STREAM y AS SELECT f0 FROM x EMIT CHANGES; The second statement defines a stream with only a single field in the value, named ``f0``. @@ -136,7 +136,7 @@ For example, .. code:: sql - CREATE STREAM y WITH(WRAP_SINGLE_VALUE=false) AS SELECT f0 FROM x; + CREATE STREAM y WITH(WRAP_SINGLE_VALUE=false) AS SELECT f0 FROM x EMIT CHANGES; If a statement doesn't set the value wrapping explicitly, KSQL uses the system default, defined by ``ksql.persistence.wrap.single.values``. You can change the system default. @@ -179,14 +179,14 @@ Single-field serialization examples -- creates a stream, picking up the system default of wrapping values. -- the serialized values in the sink topic will be wrapped. - CREATE STREAM IMPLICIT_SINK AS SELECT ID FROM S; + CREATE STREAM IMPLICIT_SINK AS SELECT ID FROM S EMIT CHANGES; -- override 'ksql.persistence.wrap.single.values' to false -- the serialized values will not be wrapped. - CREATE STREAM EXPLICIT_SINK WITH(WRAP_SINGLE_VALUE=false) AS SELECT ID FROM S; + CREATE STREAM EXPLICIT_SINK WITH(WRAP_SINGLE_VALUE=false) AS SELECT ID FROM S EMIT CHANGES; -- results in an error as the value schema is multi-field - CREATE STREAM BAD_SINK WITH(WRAP_SINGLE_VALUE=true) AS SELECT ID, COST FROM S; + CREATE STREAM BAD_SINK WITH(WRAP_SINGLE_VALUE=true) AS SELECT ID, COST FROM S EMIT CHANGES; .. _ksql_formats: @@ -286,7 +286,7 @@ the ``WRAP_SINGLE_VALUE`` is set to ``false``, for example: .. code:: sql - CREATE STREAM y WITH (WRAP_SINGLE_VALUE=false) AS SELECT id FROM x; + CREATE STREAM y WITH (WRAP_SINGLE_VALUE=false) AS SELECT id FROM x EMIT CHANGES; For more information, see :ref:`ksql_single_field_wrapping`. @@ -363,7 +363,7 @@ the ``WRAP_SINGLE_VALUE`` is set to ``false``, for example: .. code:: sql - CREATE STREAM y WITH (WRAP_SINGLE_VALUE=false) AS SELECT id FROM x; + CREATE STREAM y WITH (WRAP_SINGLE_VALUE=false) AS SELECT id FROM x EMIT CHANGES; For more information, see :ref:`ksql_single_field_wrapping`. diff --git a/docs/developer-guide/syntax-reference.rst b/docs/developer-guide/syntax-reference.rst index 0296717b02c7..11f4bd39bf8d 100644 --- a/docs/developer-guide/syntax-reference.rst +++ b/docs/developer-guide/syntax-reference.rst @@ -558,7 +558,8 @@ CREATE STREAM AS SELECT FROM from_stream [ LEFT | FULL | INNER ] JOIN [join_table | join_stream] [ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ] ON join_criteria [ WHERE condition ] - [PARTITION BY column_name]; + [PARTITION BY column_name] + EMIT CHANGES; **Description** @@ -678,7 +679,8 @@ CREATE TABLE AS SELECT [ WINDOW window_expression ] [ WHERE condition ] [ GROUP BY grouping_expression ] - [ HAVING having_expression ]; + [ HAVING having_expression ] + EMIT CHANGES; **Description** @@ -783,7 +785,8 @@ INSERT INTO SELECT select_expr [., ...] FROM from_stream [ WHERE condition ] - [ PARTITION BY column_name ]; + [ PARTITION BY column_name ] + EMIT CHANGES; **Description** @@ -936,7 +939,7 @@ Your output should resemble: Queries that write into this TABLE ----------------------------------- - id:CTAS_IP_SUM - CREATE TABLE IP_SUM as SELECT ip, sum(bytes)/1024 as kbytes FROM CLICKSTREAM window SESSION (300 second) GROUP BY ip; + id:CTAS_IP_SUM - CREATE TABLE IP_SUM as SELECT ip, sum(bytes)/1024 as kbytes FROM CLICKSTREAM window SESSION (300 second) GROUP BY ip EMIT CHANGES; For query topology and execution plan run: EXPLAIN ; for more information @@ -987,7 +990,7 @@ Your output should resemble: :: Type : QUERY - SQL : CREATE TABLE IP_SUM as SELECT ip, sum(bytes)/1024 as kbytes FROM CLICKSTREAM window SESSION (300 second) GROUP BY ip; + SQL : CREATE TABLE IP_SUM as SELECT ip, sum(bytes)/1024 as kbytes FROM CLICKSTREAM window SESSION (300 second) GROUP BY ip EMIT CHANGES; Local runtime statistics @@ -1108,11 +1111,55 @@ Your output should resemble: Format:JSON {"ROWTIME":1516010696273,"ROWKEY":"\"stream/CLICKSTREAM/create\"","statement":"CREATE STREAM clickstream (_time bigint,time varchar, ip varchar, request varchar, status int, userid int, bytes bigint, agent varchar) with (kafka_topic = 'clickstream', value_format = 'json');","streamsProperties":{}} - {"ROWTIME":1516010709492,"ROWKEY":"\"table/EVENTS_PER_MIN/create\"","statement":"create table events_per_min as select userid, count(*) as events from clickstream window TUMBLING (size 10 second) group by userid;","streamsProperties":{}} + {"ROWTIME":1516010709492,"ROWKEY":"\"table/EVENTS_PER_MIN/create\"","statement":"create table events_per_min as select userid, count(*) as events from clickstream window TUMBLING (size 10 second) group by userid EMIT CHANGES;","streamsProperties":{}} ^CTopic printing ceased -SELECT ------- +PULL QUERY +---------- + +**Synopsis** + +.. code:: sql + + SELECT select_expr [, ...] + FROM aggregate_table + WHERE ROWKEY=key + [AND window_bounds]; + +**Description** + +Pulls the current value from the materialized table and terminate. +The result of this statement will not be persisted in a Kafka topic and will only be printed out in +the console. + +The WHERE clause must contain a single value of ``ROWKEY`` to retieve and may optionally include +bounds on WINDOWSTART if the materialized table is windowed. + +Example: + +.. code:: sql + + SELECT * FROM pageviews_by_region + WHERE ROWKEY = 'Region_1' + AND 1570051876000 <= WINDOWSTART AND WINDOWSTART <= 1570138276000; + +When writing logical expressions using ``WINDOWSTART``, ISO-8601 formatted datestrings can also be +used to represent date times. +For example, the above query is equivalent to the following: + +.. code:: sql + + SELECT * FROM pageviews_by_region + WHERE ROWKEY = 'Region_1' + AND '2019-10-02T21:31:16' <= WINDOWSTART AND WINDOWSTART <= '2019-10-03T21:31:16'; + +Timezones can be specified within the datestring. For example, `2017-11-17T04:53:45-0330` is in the Newfoundland time +zone. If no timezone is specified within the datestring, then timestamps are interperted in the UTC timezone. + +If not bounds are placed on ``WINDOWSTART`` then rows will be returned for all windows in the windowed table. + +PUSH QUERY +---------- **Synopsis** @@ -1125,12 +1172,13 @@ SELECT [ WHERE condition ] [ GROUP BY grouping_expression ] [ HAVING having_expression ] + EMIT CHANGES [ LIMIT count ]; **Description** -Selects rows from a KSQL stream or table. The result of this statement -will not be persisted in a Kafka topic and will only be printed out in +Push a continuous stream of updates to the KSQL stream or table. +The result of this statement will not be persisted in a Kafka topic and will only be printed out in the console. To stop the continuous query in the CLI press ``Ctrl-C``. Note that the WINDOW clause can only be used if the ``from_item`` is a stream. @@ -1151,7 +1199,7 @@ Example: WHERE ROWTIME >= 1510923225000 AND ROWTIME <= 1510923228000; -When writing logical expressions using ``ROWTIME``, ISO-8601 formatted datestrings can also be used to represent dates. +When writing logical expressions using ``ROWTIME``, ISO-8601 formatted datestrings can also be used to represent date times. For example, the above query is equivalent to the following: .. code:: sql @@ -1172,7 +1220,7 @@ Example: .. code:: sql - SELECT * FROM pageviews LIMIT 5; + SELECT * FROM pageviews EMIT CHANGES LIMIT 5; If no limit is supplied the query will run until terminated, streaming back all results to the console. @@ -1201,7 +1249,8 @@ the following WINDOW types: SELECT item_id, SUM(quantity) FROM orders WINDOW TUMBLING (SIZE 20 SECONDS) - GROUP BY item_id; + GROUP BY item_id + EMIT CHANGES; - **HOPPING**: Hopping windows group input records into fixed-sized, (possibly) overlapping windows based on the records’ timestamps. You @@ -1215,7 +1264,8 @@ the following WINDOW types: SELECT item_id, SUM(quantity) FROM orders WINDOW HOPPING (SIZE 20 SECONDS, ADVANCE BY 5 SECONDS) - GROUP BY item_id; + GROUP BY item_id + EMIT CHANGES; - **SESSION**: Session windows group input records into so-called sessions. You must specify the *session inactivity gap* parameter for @@ -1232,7 +1282,8 @@ the following WINDOW types: SELECT item_id, SUM(quantity) FROM orders WINDOW SESSION (20 SECONDS) - GROUP BY item_id; + GROUP BY item_id + EMIT CHANGES; Every output column of an expression in the SELECT list has an output name. To specify the output name of a column, use ``AS OUTPUT_NAME`` after the expression definition. If it is omitted, KSQL will assign a system generated name @@ -1244,7 +1295,8 @@ a column of a from_item, then the output name is the name of that column. .. code:: sql SELECT 1, KSQL_COL_0 - FROM orders; + FROM orders + EMIT CHANGES; is not allowed as the output name for the literal ``1`` is ``KSQL_COL_0``. @@ -1266,7 +1318,8 @@ example of converting a BIGINT into a VARCHAR type: SELECT page_id, CONCAT(CAST(COUNT(*) AS VARCHAR), '_HELLO') FROM pageviews_enriched WINDOW TUMBLING (SIZE 20 SECONDS) - GROUP BY page_id; + GROUP BY page_id + EMIT CHANGES; CASE ~~~~ @@ -1300,7 +1353,8 @@ statement. Here's an example of a CASE expression: WHEN orderunits < 4.0 THEN 'medium' ELSE 'large' END AS case_result - FROM orders; + FROM orders + EMIT CHANGES; LIKE ~~~~ @@ -1320,7 +1374,8 @@ Example: SELECT user_id FROM users - WHERE user_id LIKE 'santa%'; + WHERE user_id LIKE 'santa%' + EMIT CHANGES; BETWEEN ~~~~~~~ @@ -1342,6 +1397,7 @@ Example: SELECT event FROM events WHERE event_id BETWEEN 10 AND 20 + EMIT CHANGES; SHOW FUNCTIONS -------------- @@ -1491,13 +1547,13 @@ The explanation for each operator includes a supporting example based on the fol .. code:: sql - SELECT LEN(FIRST_NAME) + LEN(LAST_NAME) AS NAME_LENGTH FROM USERS; + SELECT LEN(FIRST_NAME) + LEN(LAST_NAME) AS NAME_LENGTH FROM USERS EMIT CHANGES; - Concatenation (``+,||``) The concatenation operator can be used to concatenate STRING values. .. code:: sql - SELECT FIRST_NAME + LAST_NAME AS FULL_NAME FROM USERS; + SELECT FIRST_NAME + LAST_NAME AS FULL_NAME FROM USERS EMIT CHANGES; - You can use the ``+`` operator for multi-part concatenation, for example: @@ -1510,21 +1566,22 @@ The explanation for each operator includes a supporting example based on the fol CAST(INVALID_LOGIN_COUNT AS VARCHAR) + ' attempts in the last minute (threshold is >=4)' FROM INVALID_USERS_LOGINS_PER_HOST - WHERE INVALID_LOGIN_COUNT>=4; + WHERE INVALID_LOGIN_COUNT>=4 + EMIT CHANGES; - Source Dereference (``.``) The source dereference operator can be used to specify columns by dereferencing the source stream or table. .. code:: sql - SELECT USERS.FIRST_NAME FROM USERS; + SELECT USERS.FIRST_NAME FROM USERS EMIT CHANGES; - Subscript (``[subscript_expr]``) The subscript operator is used to reference the value at an array index or a map key. .. code:: sql - SELECT NICKNAMES[0] FROM USERS; + SELECT NICKNAMES[0] FROM USERS EMIT CHANGES; - STRUCT dereference (``->``) Access nested data by declaring a STRUCT and using the dereference operator to access its fields: @@ -1535,13 +1592,13 @@ The explanation for each operator includes a supporting example based on the fol orderId BIGINT, address STRUCT) WITH (...); - SELECT address->street, address->zip FROM orders; + SELECT address->street, address->zip FROM orders EMIT CHANGES; - Combine `->` with `.` when using aliases: .. code:: sql - SELECT orders.address->street, o.address->zip FROM orders o; + SELECT orders.address->street, o.address->zip FROM orders o EMIT CHANGES; .. _functions: @@ -2010,7 +2067,8 @@ Example: WITH(KAFKA_TOPIC='users-with-proper-key') AS SELECT CAST(userid as VARCHAR) as userid_string, username, email FROM users_with_wrong_key_format - PARTITION BY userid_string; + PARTITION BY userid_string + EMIT CHANGES; -- Now you can create the table on the properly keyed stream. CREATE TABLE users_table (userid_string VARCHAR, username VARCHAR, email VARCHAR) @@ -2041,7 +2099,8 @@ Example: WITH(KAFKA_TOPIC='users-with-proper-key') AS SELECT CAST(ROWKEY as VARCHAR) as userid_string, username, email FROM users_with_missing_key - PARTITION BY userid_string; + PARTITION BY userid_string + EMIT CHANGES; -- Now you can create the table on the properly keyed stream. CREATE TABLE users_table (userid_string VARCHAR, username VARCHAR, email VARCHAR) diff --git a/docs/developer-guide/transform-a-stream-with-ksql.rst b/docs/developer-guide/transform-a-stream-with-ksql.rst index 21dca3b3b6e5..bb9d9cd3ae53 100644 --- a/docs/developer-guide/transform-a-stream-with-ksql.rst +++ b/docs/developer-guide/transform-a-stream-with-ksql.rst @@ -50,7 +50,8 @@ The following statement generates a new stream, named pageid, TIMESTAMPTOSTRING(viewtime, 'yyyy-MM-dd HH:mm:ss.SSS') AS timestring FROM pageviews - PARTITION BY userid; + PARTITION BY userid + EMIT CHANGES; Content-based Routing ********************* @@ -74,7 +75,8 @@ different users selected into the output. pageid FROM pageviews WHERE userid='User_1' OR userid='User_2' - PARTITION BY userid; + PARTITION BY userid + EMIT CHANGES; .. code:: sql @@ -84,7 +86,8 @@ different users selected into the output. pageid FROM pageviews WHERE userid<>'User_1' AND userid<>'User_2' - PARTITION BY userid; + PARTITION BY userid + EMIT CHANGES; Next Steps ********** diff --git a/docs/developer-guide/udf.rst b/docs/developer-guide/udf.rst index 69ce4d01fc6a..4f344420b686 100644 --- a/docs/developer-guide/udf.rst +++ b/docs/developer-guide/udf.rst @@ -744,7 +744,7 @@ built-in functions. The function names are case-insensitive. For example, using CREATE STREAM number_stream (int1 INT, int2 INT, long1 BIGINT, long2 BIGINT) WITH (VALUE_FORMAT = 'JSON', KAFKA_TOPIC = 'numbers'); - SELECT multiply(int1, int2), MULTIPLY(long1, long2) FROM number_stream; + SELECT multiply(int1, int2), MULTIPLY(long1, long2) FROM number_stream EMIT CHANGES; diff --git a/docs/includes/csas-snippet.sql b/docs/includes/csas-snippet.sql index d73129d1f677..38228e88a4d6 100644 --- a/docs/includes/csas-snippet.sql +++ b/docs/includes/csas-snippet.sql @@ -1,3 +1,4 @@ CREATE STREAM foo WITH (TIMESTAMP='t2') AS SELECT * FROM bar - WINDOW TUMBLING (size 10 seconds); \ No newline at end of file + WINDOW TUMBLING (size 10 seconds) + EMIT CHANGES; \ No newline at end of file diff --git a/docs/includes/ctas-snippet.sql b/docs/includes/ctas-snippet.sql index 05dccda643b8..133ea481039e 100644 --- a/docs/includes/ctas-snippet.sql +++ b/docs/includes/ctas-snippet.sql @@ -1,4 +1,5 @@ CREATE TABLE foo WITH (TIMESTAMP='t2') AS SELECT host, COUNT(*) FROM bar WINDOW TUMBLING (size 10 seconds) - GROUP BY host; + GROUP BY host + EMIT CHANGES; diff --git a/docs/includes/ksql-includes.rst b/docs/includes/ksql-includes.rst index 26dd8538257b..5375d446fba2 100644 --- a/docs/includes/ksql-includes.rst +++ b/docs/includes/ksql-includes.rst @@ -138,7 +138,7 @@ Inspect the ``users`` topic by using the PRINT statement: .. code:: sql - PRINT users; + PRINT 'users'; Your output should resemble: @@ -157,7 +157,7 @@ Inspect the ``pageviews`` topic by using the PRINT statement: .. code:: sql - PRINT pageviews LIMIT 3; + PRINT 'pageviews'; Your output should resemble: @@ -167,8 +167,12 @@ Your output should resemble: 10/23/18 12:24:03 AM UTC , 9461 , 1540254243183,User_9,Page_20 10/23/18 12:24:03 AM UTC , 9471 , 1540254243617,User_7,Page_47 10/23/18 12:24:03 AM UTC , 9481 , 1540254243888,User_4,Page_27 + ^C10/23/18 12:24:05 AM UTC , 9521 , 1540254245161,User_9,Page_62 + Topic printing ceased ksql> +Press CTRL+C to stop printing messages. + For more information, see :ref:`ksql_syntax_reference`. .. inspect_topics_end @@ -255,19 +259,16 @@ the latest offset. .. code:: sql - SELECT pageid FROM pageviews_original EMIT CHANGES LIMIT 3; + SELECT pageid FROM pageviews_original LIMIT 3; Your output should resemble: :: - +-----------------+ - |PAGEID | - +-----------------+ - |Page_63 | - |Page_44 | - |Page_59 | - Limit Reached + Page_24 + Page_73 + Page_78 + LIMIT reached Query terminated #. Create a persistent query by using the ``CREATE STREAM`` keywords to precede the ``SELECT`` statement. The results from this @@ -280,18 +281,16 @@ the latest offset. SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original - ON pageviews_original.userid = users_original.userid - EMIT CHANGES; + ON pageviews_original.userid = users_original.userid; Your output should resemble: :: Message - ---------------------------------------------------------------------------------------------------------- - Stream PAGEVIEWS_ENRICHED created and running. Created by query with query ID: CSAS_PAGEVIEWS_ENRICHED_4 - ---------------------------------------------------------------------------------------------------------- - + ---------------------------- + Stream created and running + ---------------------------- .. tip:: You can run ``DESCRIBE pageviews_enriched;`` to describe the stream. @@ -300,18 +299,15 @@ the latest offset. .. code:: sql - SELECT * FROM pageviews_enriched EMIT CHANGES; + SELECT * FROM pageviews_enriched; Your output should resemble: :: - +---------------------+-----------+--------+---------+----------+----------+ - |ROWTIME |ROWKEY |USERID |PAGEID |REGIONID |GENDER | - +---------------------+-----------+--------+---------+----------+----------+ - |1570059294609 |User_7 |User_7 |Page_80 |Region_3 |FEMALE | - |1570059294864 |User_7 |User_7 |Page_19 |Region_3 |FEMALE | - |1570059295004 |User_1 |User_1 |Page_27 |Region_7 |OTHER | + 1519746861328 | User_4 | User_4 | Page_58 | Region_5 | OTHER + 1519746861794 | User_9 | User_9 | Page_94 | Region_9 | MALE + 1519746862164 | User_1 | User_1 | Page_90 | Region_7 | FEMALE ^CQuery terminated #. Create a new persistent query where a condition limits the streams content, using ``WHERE``. Results from this query @@ -321,17 +317,16 @@ the latest offset. CREATE STREAM pageviews_female AS SELECT * FROM pageviews_enriched - WHERE gender = 'FEMALE' - EMIT CHANGES; + WHERE gender = 'FEMALE'; Your output should resemble: :: Message - ------------------------------------------------------------------------------------------------------ - Stream PAGEVIEWS_FEMALE created and running. Created by query with query ID: CSAS_PAGEVIEWS_FEMALE_5 - ------------------------------------------------------------------------------------------------------ + ---------------------------- + Stream created and running + ---------------------------- .. tip:: You can run ``DESCRIBE pageviews_female;`` to describe the stream. @@ -343,85 +338,62 @@ the latest offset. CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9') AS SELECT * FROM pageviews_female - WHERE regionid LIKE '%_8' OR regionid LIKE '%_9' - EMIT CHANGES; + WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'; Your output should resemble: :: Message - ---------------------------------------------------------------------------------------------------------------------- - Stream PAGEVIEWS_FEMALE_LIKE_89 created and running. Created by query with query ID: CSAS_PAGEVIEWS_FEMALE_LIKE_89_6 - ---------------------------------------------------------------------------------------------------------------------- + ---------------------------- + Stream created and running + ---------------------------- -#. Create a new persistent query that counts the pageviews for each region in a :ref:`tumbling window ` - of 30 seconds when the count is greater than one. Results from this query are written to the ``PAGEVIEWS_REGIONS`` - Kafka topic in the Avro format. KSQL will register the Avro schema with the configured |sr| when it writes the first - message to the ``PAGEVIEWS_REGIONS`` topic. +#. Create a new persistent query that counts the pageviews for each region and gender combination in a + :ref:`tumbling window ` of 30 seconds when the count is greater than one. Results from this query + are written to the ``PAGEVIEWS_REGIONS`` Kafka topic in the Avro format. KSQL will register the Avro schema with the + configured |sr| when it writes the first message to the ``PAGEVIEWS_REGIONS`` topic. .. code:: sql CREATE TABLE pageviews_regions WITH (VALUE_FORMAT='avro') AS - SELECT regionid , COUNT(*) AS numusers + SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_enriched WINDOW TUMBLING (size 30 second) - GROUP BY regionid - HAVING COUNT(*) > 1 - EMIT CHANGES; + GROUP BY gender, regionid + HAVING COUNT(*) > 1; Your output should resemble: :: Message - ------------------------------------------------------------------------------------------------------- - Table PAGEVIEWS_REGIONS created and running. Created by query with query ID: CTAS_PAGEVIEWS_REGIONS_7 - ------------------------------------------------------------------------------------------------------- + --------------------------- + Table created and running + --------------------------- .. tip:: You can run ``DESCRIBE pageviews_regions;`` to describe the table. -#. Optional: View the changes to the above table in realtime using ``SELECT``. +#. Optional: View results from the above queries using ``SELECT``. .. code:: sql - SELECT regionid, numusers FROM pageviews_regions EMIT CHANGES LIMIT 5; + SELECT gender, regionid, numusers FROM pageviews_regions LIMIT 5; Your output should resemble: :: - +-------------+-----------+ - |REGIONID |NUMUSERS | - +-------------+-----------+ - |Region_4 |2 | - |Region_1 |6 | - |Region_3 |3 | - |Region_5 |2 | - |Region_2 |2 | + FEMALE | Region_6 | 3 + FEMALE | Region_1 | 4 + FEMALE | Region_9 | 6 + MALE | Region_8 | 2 + OTHER | Region_5 | 4 LIMIT reached Query terminated ksql> -#. Optional: Query the materialized table for the values stored within KSQL for a specific key: - -.. code:: sql - - SELECT * FROM pageviews_regions WHERE ROWKEY='Region_4'; - - Your output should resemble: - - :: - - ROWKEY STRING KEY | WINDOWSTART BIGINT KEY | REGIONID STRING | NUMUSERS BIGINT - -------------------------------------------------------------------------------- - Region_4 | 1570133790000 | Region_1 | 6 - Region_4 | 1570133820000 | Region_1 | 14 - Region_4 | 1570133850000 | Region_1 | 16 - Region_4 | 1570133880000 | Region_1 | 14 - Region_4 | 1570133910000 | Region_1 | 16 - #. Optional: Show all persistent queries. .. code:: sql @@ -434,10 +406,10 @@ the latest offset. Query ID | Kafka Topic | Query String -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - CSAS_PAGEVIEWS_FEMALE_1 | PAGEVIEWS_FEMALE | CREATE STREAM pageviews_female AS SELECT * FROM pageviews_enriched WHERE gender = 'FEMALE' EMIT CHANGES; - CTAS_PAGEVIEWS_REGIONS_3 | PAGEVIEWS_REGIONS | CREATE TABLE pageviews_regions WITH (VALUE_FORMAT='avro') AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_enriched WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1 EMIT CHANGES; - CSAS_PAGEVIEWS_FEMALE_LIKE_89_2 | PAGEVIEWS_FEMALE_LIKE_89 | CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9' EMIT CHANGES; - CSAS_PAGEVIEWS_ENRICHED_0 | PAGEVIEWS_ENRICHED | CREATE STREAM pageviews_enriched AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid EMIT CHANGES; + CSAS_PAGEVIEWS_FEMALE_1 | PAGEVIEWS_FEMALE | CREATE STREAM pageviews_female AS SELECT * FROM pageviews_enriched WHERE gender = 'FEMALE'; + CTAS_PAGEVIEWS_REGIONS_3 | PAGEVIEWS_REGIONS | CREATE TABLE pageviews_regions WITH (VALUE_FORMAT='avro') AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_enriched WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1; + CSAS_PAGEVIEWS_FEMALE_LIKE_89_2 | PAGEVIEWS_FEMALE_LIKE_89 | CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'; + CSAS_PAGEVIEWS_ENRICHED_0 | PAGEVIEWS_ENRICHED | CREATE STREAM pageviews_enriched AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid; -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- For detailed information on a Query run: EXPLAIN ; @@ -454,23 +426,24 @@ the latest offset. Name : PAGEVIEWS_REGIONS Type : TABLE - Key field : REGIONID + Key field : KSQL_INTERNAL_COL_0|+|KSQL_INTERNAL_COL_1 Key format : STRING Timestamp field : Not set - using Value format : AVRO - Kafka topic : PAGEVIEWS_REGIONS (partitions: 1, replication: 1) + Kafka topic : PAGEVIEWS_REGIONS (partitions: 4, replication: 1) Field | Type -------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) + GENDER | VARCHAR(STRING) REGIONID | VARCHAR(STRING) NUMUSERS | BIGINT -------------------------------------- Queries that write into this TABLE ----------------------------------- - CTAS_PAGEVIEWS_REGIONS_3 : CREATE TABLE pageviews_regions WITH (value_format='avro') AS SELECT regionid, COUNT(*) AS numusers FROM pageviews_enriched WINDOW TUMBLING (size 30 second) GROUP BY regionid HAVING COUNT(*) > 1 EMIT CHANGES; + CTAS_PAGEVIEWS_REGIONS_3 : CREATE TABLE pageviews_regions WITH (value_format='avro') AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_enriched WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1; For query topology and execution plan please run: EXPLAIN @@ -603,7 +576,7 @@ Query the data, using ``->`` notation to access the Struct contents: .. code:: sql - SELECT ORDERID, ADDRESS->CITY FROM ORDERS EMIT CHANGES; + SELECT ORDERID, ADDRESS->CITY FROM ORDERS; Your output should resemble: @@ -668,7 +641,7 @@ For the ``NEW_ORDERS`` topic, run: .. code:: sql - SELECT ORDER_ID, TOTAL_AMOUNT, CUSTOMER_NAME FROM NEW_ORDERS EMIT CHANGES LIMIT 3; + SELECT ORDER_ID, TOTAL_AMOUNT, CUSTOMER_NAME FROM NEW_ORDERS LIMIT 3; Your output should resemble: @@ -682,7 +655,7 @@ For the ``SHIPMENTS`` topic, run: .. code:: sql - SELECT ORDER_ID, SHIPMENT_ID, WAREHOUSE FROM SHIPMENTS EMIT CHANGES LIMIT 2; + SELECT ORDER_ID, SHIPMENT_ID, WAREHOUSE FROM SHIPMENTS LIMIT 2; Your output should resemble: @@ -701,8 +674,7 @@ based on a join window of 1 hour. FROM NEW_ORDERS O INNER JOIN SHIPMENTS S WITHIN 1 HOURS - ON O.ORDER_ID = S.ORDER_ID - EMIT CHANGES; + ON O.ORDER_ID = S.ORDER_ID; Your output should resemble: @@ -778,7 +750,7 @@ Inspect the WAREHOUSE_LOCATION table: .. code:: sql - SELECT ROWKEY, WAREHOUSE_ID FROM WAREHOUSE_LOCATION EMIT CHANGES LIMIT 3; + SELECT ROWKEY, WAREHOUSE_ID FROM WAREHOUSE_LOCATION LIMIT 3; Your output should resemble: @@ -794,7 +766,7 @@ Inspect the WAREHOUSE_SIZE table: .. code:: sql - SELECT ROWKEY, WAREHOUSE_ID FROM WAREHOUSE_SIZE EMIT CHANGES LIMIT 3; + SELECT ROWKEY, WAREHOUSE_ID FROM WAREHOUSE_SIZE LIMIT 3; Your output should resemble: @@ -814,7 +786,6 @@ Now join the two tables: FROM WAREHOUSE_LOCATION WL LEFT JOIN WAREHOUSE_SIZE WS ON WL.WAREHOUSE_ID=WS.WAREHOUSE_ID - EMIT CHANGES LIMIT 3; Your output should resemble: @@ -871,7 +842,7 @@ as part of the ``SELECT``: .. code:: sql - CREATE STREAM ALL_ORDERS AS SELECT 'LOCAL' AS SRC, * FROM ORDERS_SRC_LOCAL EMIT CHANGES; + CREATE STREAM ALL_ORDERS AS SELECT 'LOCAL' AS SRC, * FROM ORDERS_SRC_LOCAL; Your output should resemble: @@ -911,7 +882,7 @@ Add stream of 3rd party orders into the existing output stream: .. code:: sql - INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY EMIT CHANGES; + INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY; Your output should resemble: @@ -928,7 +899,7 @@ written to it: .. code:: sql - SELECT * FROM ALL_ORDERS EMIT CHANGES; + SELECT * FROM ALL_ORDERS; Your output should resemble the following. Note that there are messages from both source topics (denoted by ``LOCAL`` and ``3RD PARTY`` respectively). @@ -958,8 +929,8 @@ Your output should resemble: Query ID | Kafka Topic | Query String ------------------------------------------------------------------------------------------------------------------- - CSAS_ALL_ORDERS_0 | ALL_ORDERS | CREATE STREAM ALL_ORDERS AS SELECT 'LOCAL' AS SRC, * FROM ORDERS_SRC_LOCAL EMIT CHANGES; - InsertQuery_1 | ALL_ORDERS | INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY EMIT CHANGES; + CSAS_ALL_ORDERS_0 | ALL_ORDERS | CREATE STREAM ALL_ORDERS AS SELECT 'LOCAL' AS SRC, * FROM ORDERS_SRC_LOCAL; + InsertQuery_1 | ALL_ORDERS | INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY; ------------------------------------------------------------------------------------------------------------------- .. insert-into_02_end diff --git a/docs/installation/server-config/avro-schema.rst b/docs/installation/server-config/avro-schema.rst index 46ab440f8bde..e878821ca986 100644 --- a/docs/installation/server-config/avro-schema.rst +++ b/docs/installation/server-config/avro-schema.rst @@ -121,7 +121,8 @@ generates an appropriate Avro schema for the new ``pageviews_avro`` stream, and CREATE STREAM pageviews_avro WITH (VALUE_FORMAT = 'AVRO') AS - SELECT * FROM pageviews_json; + SELECT * FROM pageviews_json + EMIT CHANGES; For more information, see `Changing Data Serialization Format from JSON to Avro `__ in the `Stream Processing Cookbook `__. diff --git a/docs/installation/server-config/config-reference.rst b/docs/installation/server-config/config-reference.rst index d0ce38f35627..e0e5874a15c1 100644 --- a/docs/installation/server-config/config-reference.rst +++ b/docs/installation/server-config/config-reference.rst @@ -274,7 +274,7 @@ For example, consider the statement: .. code:: sql - CREATE STREAM y AS SELECT f0 FROM x; + CREATE STREAM y AS SELECT f0 FROM x EMIT CHANGES; The statement selects a single field as the value of stream ``y``. If ``f0`` has the integer value ``10``, diff --git a/docs/operations.rst b/docs/operations.rst index b2fa1658f00f..4a6f1f3f976a 100644 --- a/docs/operations.rst +++ b/docs/operations.rst @@ -98,7 +98,7 @@ Troubleshooting ------------------------------------ SELECT query hangs and doesn’t stop? ------------------------------------ -Queries in KSQL, including non-persistent queries such as ``SELECT * FROM myTable``, are continuous streaming queries. +Queries in KSQL, including non-persistent queries such as ``SELECT * FROM myTable EMIT CHANGES``, are continuous streaming queries. Streaming queries will not stop unless explicitly terminated. To terminate a non-persistent query in the KSQL CLI you must type ``Ctrl + C``. diff --git a/docs/troubleshoot-ksql.rst b/docs/troubleshoot-ksql.rst index a101d3d9ed96..1dfe7fb6955c 100644 --- a/docs/troubleshoot-ksql.rst +++ b/docs/troubleshoot-ksql.rst @@ -8,10 +8,10 @@ This guide contains troubleshooting information for many KSQL issues. SELECT query does not stop ************************** -KSQL queries streams continuously and must be stopped explicitly. In the CLI, -use Ctrl-C to stop non-persistent queries, like ``SELECT * FROM myTable``. -To stop a persistent query created by CREATE STREAM AS SELECT or -CREATE TABLE AS SELECT, use the TERMINATE statement: ``TERMINATE query_id;``. +KSQL streaming queries must be stopped explicitly. In the CLI, +use Ctrl-C to stop non-persistent queries, like ``SELECT * FROM myTable EMIT CHANGES``. +To stop a persistent query created by ``CREATE STREAM AS SELECT`` or +``CREATE TABLE AS SELECT``, use the TERMINATE statement: ``TERMINATE query_id;``. For more information, see :ref:`ksql-terminate`. SELECT query returns no results diff --git a/docs/tutorials/basics-control-center.rst b/docs/tutorials/basics-control-center.rst index 457a6d1020aa..8f119cbbdf2a 100644 --- a/docs/tutorials/basics-control-center.rst +++ b/docs/tutorials/basics-control-center.rst @@ -211,7 +211,8 @@ end them with the TERMINATE statement. SELECT users.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users - ON pageviews_original.userid = users.userid; + ON pageviews_original.userid = users.userid + EMIT CHANGES; Your output should resemble: @@ -276,7 +277,8 @@ You can assign properties in the KSQL Editor before you run your queries. CREATE STREAM pageviews_female AS SELECT * FROM pageviews_enriched - WHERE gender = 'FEMALE'; + WHERE gender = 'FEMALE' + EMIT CHANGES; .. figure:: ../img/c3-ksql-set-auto-offset-reset.png :alt: Screenshot showing how to set a query property in the KSQL Editor page @@ -327,7 +329,7 @@ JSON file. .. code:: sql - SELECT * FROM PAGEVIEWS_FEMALE; + SELECT * FROM PAGEVIEWS_FEMALE EMIT CHANGES; #. In the query results window, select some records and click **Download**. diff --git a/docs/tutorials/clickstream-docker.rst b/docs/tutorials/clickstream-docker.rst index 46da7552e913..c613d4c79110 100644 --- a/docs/tutorials/clickstream-docker.rst +++ b/docs/tutorials/clickstream-docker.rst @@ -227,7 +227,7 @@ Verify the data .. code:: sql - SELECT * FROM CLICKSTREAM LIMIT 5; + SELECT * FROM CLICKSTREAM EMIT CHANGES LIMIT 5; Your output should resemble: @@ -246,7 +246,7 @@ Verify the data .. code:: sql - SELECT * FROM EVENTS_PER_MIN LIMIT 5; + SELECT * FROM EVENTS_PER_MIN EMIT CHANGES LIMIT 5; Your output should resemble: @@ -264,7 +264,7 @@ Verify the data .. code:: sql - SELECT * FROM PAGES_PER_MIN LIMIT 5; + SELECT * FROM PAGES_PER_MIN EMIT CHANGES LIMIT 5; Your output should resemble: diff --git a/docs/tutorials/examples.rst b/docs/tutorials/examples.rst index 8b9e6b176e5b..8a6244e22b00 100644 --- a/docs/tutorials/examples.rst +++ b/docs/tutorials/examples.rst @@ -127,7 +127,8 @@ The following statement will generate a new stream, pageid, TIMESTAMPTOSTRING(viewtime, 'yyyy-MM-dd HH:mm:ss.SSS') AS timestring FROM pageviews - PARTITION BY userid; + PARTITION BY userid + EMIT CHANGES; Use a ``[ WHERE condition ]`` clause to select a subset of data. If you want to route streams with different criteria to different streams @@ -146,7 +147,8 @@ write multiple KSQL statements as follows: TIMESTAMPTOSTRING(viewtime, 'yyyy-MM-dd HH:mm:ss.SSS') AS timestring FROM pageviews WHERE userid='User_1' OR userid='User_2' - PARTITION BY userid; + PARTITION BY userid + EMIT CHANGES; .. code:: sql @@ -160,7 +162,8 @@ write multiple KSQL statements as follows: TIMESTAMPTOSTRING(viewtime, 'yyyy-MM-dd HH:mm:ss.SSS') AS timestring FROM pageviews WHERE userid<>'User_1' AND userid<>'User_2' - PARTITION BY userid; + PARTITION BY userid + EMIT CHANGES; Joining ~~~~~~~ @@ -171,7 +174,8 @@ When joining objects the number of partitions in each must be the same. You can CREATE TABLE users_5part WITH (PARTITIONS=5) AS - SELECT * FROM USERS; + SELECT * FROM USERS + EMIT CHANGES; Now you can use the following query to create a new stream by joining the ``pageviews_transformed`` stream with the ``users_5part`` table. @@ -188,7 +192,8 @@ Now you can use the following query to create a new stream by joining the u.interests, u.contactinfo FROM pageviews_transformed pv - LEFT JOIN users_5part u ON pv.userid = u.userid; + LEFT JOIN users_5part u ON pv.userid = u.userid + EMIT CHANGES; Note that by default all the Kafka topics will be read from the current offset (aka the latest available data); however, in a stream-table join, @@ -208,7 +213,8 @@ Here is the query that would perform this count: SELECT regionid, count(*) FROM pageviews_enriched - GROUP BY regionid; + GROUP BY regionid + EMIT CHANGES; The above query counts the pageviews from the time you start the query until you terminate the query. Note that we used CREATE TABLE AS SELECT @@ -227,7 +233,8 @@ so that we compute the pageview count per region every 1 minute: count(*) FROM pageviews_enriched WINDOW TUMBLING (SIZE 1 MINUTE) - GROUP BY regionid; + GROUP BY regionid + EMIT CHANGES; If you want to count the pageviews for only “Region_6” by female users for every 30 seconds, you can change the above query as the following: @@ -240,7 +247,8 @@ for every 30 seconds, you can change the above query as the following: FROM pageviews_enriched WINDOW TUMBLING (SIZE 30 SECONDS) WHERE UCASE(gender)='FEMALE' AND LCASE(regionid)='region_6' - GROUP BY regionid; + GROUP BY regionid + EMIT CHANGES; UCASE and LCASE functions in KSQL are used to convert the values of gender and regionid columns to upper and lower case, so that you can @@ -259,7 +267,8 @@ window of 30 seconds that advances by 10 seconds: FROM pageviews_enriched WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS) WHERE UCASE(gender)='FEMALE' AND LCASE (regionid) LIKE '%_6' - GROUP BY regionid; + GROUP BY regionid + EMIT CHANGES; The next statement counts the number of pageviews per region for session windows with a session inactivity gap of 60 seconds. In other words, you @@ -273,7 +282,8 @@ counting/aggregation step per region. count(*) FROM pageviews_enriched WINDOW SESSION (60 SECONDS) - GROUP BY regionid; + GROUP BY regionid + EMIT CHANGES; Sometimes, you may want to include the bounds of the current window in the result so that it is more easily accessible to consumers of the data. The following statement extracts the start and @@ -288,7 +298,8 @@ end time of the current session window into fields within output rows. count(*) FROM pageviews_enriched WINDOW SESSION (60 SECONDS) - GROUP BY regionid; + GROUP BY regionid + EMIT CHANGES; Working with arrays and maps ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -316,7 +327,8 @@ zipcode for each user: timestring, gender, regionid - FROM pageviews_enriched; + FROM pageviews_enriched + EMIT CHANGES; .. _running-ksql-command-line: @@ -360,7 +372,7 @@ The following examples show common usage: .. code:: bash cat /path/to/local/application.sql - CREATE STREAM pageviews_copy AS SELECT * FROM pageviews; + CREATE STREAM pageviews_copy AS SELECT * FROM pageviews EMIT CHANGES; .. code:: bash diff --git a/docs/tutorials/examples.sql b/docs/tutorials/examples.sql index 0b9a9df8bd8c..a2c85c3731a1 100644 --- a/docs/tutorials/examples.sql +++ b/docs/tutorials/examples.sql @@ -23,7 +23,8 @@ pageid, \ TIMESTAMPTOSTRING(viewtime, 'yyyy-MM-dd HHmmss.SSS') AS timestring \ FROM pageviews \ - PARTITION BY userid; + PARTITION BY userid \ + EMIT CHANGES; CREATE STREAM pageviews_transformed_priority_1 \ WITH (TIMESTAMP='viewtime', \ PARTITIONS=5, \ @@ -34,7 +35,8 @@ TIMESTAMPTOSTRING(viewtime, 'yyyy-MM-dd HHmmss.SSS') AS timestring \ FROM pageviews \ WHERE userid='User_1' OR userid='User_2' \ - PARTITION BY userid; + PARTITION BY userid \ + EMIT CHANGES; CREATE STREAM pageviews_transformed_priority_2 \ WITH (TIMESTAMP='viewtime', \ PARTITIONS=5, \ @@ -45,10 +47,12 @@ TIMESTAMPTOSTRING(viewtime, 'yyyy-MM-dd HHmmss.SSS') AS timestring \ FROM pageviews \ WHERE userid<>'User_1' AND userid<>'User_2' \ - PARTITION BY userid; + PARTITION BY userid \ + EMIT CHANGES; CREATE TABLE users_5part \ WITH (PARTITIONS=5) AS \ - SELECT * FROM USERS; + SELECT * FROM USERS \ + EMIT CHANGES; CREATE STREAM pageviews_enriched AS \ SELECT pv.viewtime, \ pv.userid AS userid, \ @@ -59,38 +63,44 @@ u.interests, \ u.contactinfo \ FROM pageviews_transformed pv \ - LEFT JOIN users_5part u ON pv.userid = u.userid; + LEFT JOIN users_5part u ON pv.userid = u.userid \ + EMIT CHANGES; CREATE TABLE pageviews_per_region AS \ SELECT regionid, \ count(*) \ FROM pageviews_enriched \ - GROUP BY regionid; + GROUP BY regionid \ + EMIT CHANGES; CREATE TABLE pageviews_per_region_per_minute AS \ SELECT regionid, \ count(*) \ FROM pageviews_enriched \ WINDOW TUMBLING (SIZE 1 MINUTE) \ - GROUP BY regionid; + GROUP BY regionid \ + EMIT CHANGES; CREATE TABLE pageviews_per_region_per_30secs AS \ SELECT regionid, \ count(*) \ FROM pageviews_enriched \ WINDOW TUMBLING (SIZE 30 SECONDS) \ WHERE UCASE(gender)='FEMALE' AND LCASE(regionid)='region_6' \ - GROUP BY regionid; + GROUP BY regionid \ + EMIT CHANGES; CREATE TABLE pageviews_per_region_per_30secs10secs AS \ SELECT regionid, \ count(*) \ FROM pageviews_enriched \ WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS) \ WHERE UCASE(gender)='FEMALE' AND LCASE (regionid) LIKE '%_6' \ - GROUP BY regionid; + GROUP BY regionid \ + EMIT CHANGES; CREATE TABLE pageviews_per_region_per_session AS \ SELECT regionid, \ count(*) \ FROM pageviews_enriched \ WINDOW SESSION (60 SECONDS) \ - GROUP BY regionid; + GROUP BY regionid \ + EMIT CHANGES; CREATE STREAM pageviews_interest_contact AS \ SELECT interests[0] AS first_interest, \ contactinfo['zipcode'] AS zipcode, \ @@ -101,4 +111,5 @@ timestring, \ gender, \ regionid \ - FROM pageviews_enriched; \ No newline at end of file + FROM pageviews_enriched \ + EMIT CHANGES; \ No newline at end of file diff --git a/docs/tutorials/generate-custom-test-data.rst b/docs/tutorials/generate-custom-test-data.rst index 296414785ada..091b3e5a0a32 100644 --- a/docs/tutorials/generate-custom-test-data.rst +++ b/docs/tutorials/generate-custom-test-data.rst @@ -362,6 +362,6 @@ Create the ``impressions2`` persistent streaming query: .. code:: sql - CREATE STREAM impressions2 as select * from impressions; + CREATE STREAM impressions2 as select * from impressions EMIT CHANGES;