Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: start updating docs to include EMIT CHANGES for push queries #3472

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions docs/capacity-planning.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Some queries require that the input stream be repartitioned so that all messages

.. code:: sql

CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid;
CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid EMIT CHANGES;
DESCRIBE EXTENDED pageviews_by_page;

Your output should resemble:
Expand All @@ -92,7 +92,7 @@ Some queries require that the input stream be repartitioned so that all messages
...
Queries that write into this TABLE
-----------------------------------
id:CTAS_PAGEVIEWS_BY_PAGE - CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid;
id:CTAS_PAGEVIEWS_BY_PAGE - CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Expand All @@ -107,7 +107,7 @@ Some queries require that the input stream be repartitioned so that all messages
::

Type : QUERY
SQL : CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid;
SQL : CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid EMIT CHANGES;

Execution plan
--------------
Expand Down Expand Up @@ -313,7 +313,8 @@ out all the views that lasted less than 10 seconds:
WITH (PARTITIONS=64) AS
SELECT *
FROM pageviews_original
WHERE duration > 10;
WHERE duration > 10
EMIT CHANGES;

KSQL
++++
Expand Down Expand Up @@ -354,13 +355,15 @@ and then count up views by city:
CREATE STREAM pageviews_meaningful_with_user_info
WITH (PARTITIONS=64) AS
SELECT pv.viewtime, pv.userid, pv.pageid, pv.client_ip, pv.url, pv.duration, pv.from_url, u.city, u.country, u.gender, u.email
FROM pageviews_meaningful pv LEFT JOIN users u ON pv.userid = u.userid;
FROM pageviews_meaningful pv LEFT JOIN users u ON pv.userid = u.userid
EMIT CHANGES;

CREATE TABLE pageview_counts_by_city
WITH (PARTITIONS=64) AS
SELECT country, city, count(*)
FROM pageviews_meaningful_with_user_info
GROUP BY country, city;
GROUP BY country, city
EMIT CHANGES;

KSQL
++++
Expand Down
3 changes: 2 additions & 1 deletion docs/concepts/ksql-and-kafka-streams.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ For example, to implement simple fraud-detection logic on a Kafka topic named

CREATE STREAM fraudulent_payments AS
SELECT fraudProbability(data) FROM payments
WHERE fraudProbability(data) > 0.8;
WHERE fraudProbability(data) > 0.8
EMIT CHANGES;

The equivalent Java code on Kafka Streams might resemble:

Expand Down
3 changes: 2 additions & 1 deletion docs/concepts/ksql-architecture.rst
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ from the ``authorization_attempts`` stream:
WINDOW TUMBLING (SIZE 5 SECONDS)
WHERE region = ‘west’
GROUP BY card_number
HAVING count(*) > 3;
HAVING count(*) > 3
EMIT CHANGES;

The KSQL engine translates the DML statement into a Kafka Streams application.
The application reads the source topic continuously, and whenever the
Expand Down
15 changes: 10 additions & 5 deletions docs/concepts/time-and-windows-in-ksql-queries.rst
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ run a query like this:
SELECT regionid, COUNT(*) FROM pageviews
WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS)
WHERE UCASE(gender)='FEMALE' AND LCASE (regionid) LIKE '%_6'
GROUP BY regionid;
GROUP BY regionid
EMIT CHANGES;

The hopping window's start time is inclusive, but the end time is exclusive.
This is important for non-overlapping windows, in which each record must be
Expand Down Expand Up @@ -270,7 +271,8 @@ per zip code per hour in an ``orders`` stream, you might run a query like this:
.. code:: sql

SELECT orderzip_code, TOPK(order_total, 5) FROM orders
WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY order_zipcode;
WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY order_zipcode
EMIT CHANGES;

Here's another example: to detect potential credit card fraud in an
``authorization_attempts`` stream, you might run a query for the number of
Expand All @@ -281,7 +283,8 @@ a time interval of five seconds.

SELECT card_number, count(*) FROM authorization_attempts
WINDOW TUMBLING (SIZE 5 SECONDS)
GROUP BY card_number HAVING COUNT(*) > 3;
GROUP BY card_number HAVING COUNT(*) > 3
EMIT CHANGES;

The tumbling window's start time is inclusive, but the end time is exclusive.
This is important for non-overlapping windows, in which each record must be
Expand Down Expand Up @@ -325,7 +328,8 @@ per region:

SELECT regionid, COUNT(*) FROM pageviews
WINDOW SESSION (60 SECONDS)
GROUP BY regionid;
GROUP BY regionid
EMIT CHANGES;

The start and end times for a session window are both inclusive, in contrast to
time windows.
Expand Down Expand Up @@ -356,7 +360,8 @@ For example, to find orders that have shipped within the last hour from an
FROM new_orders o
INNER JOIN shipments s
WITHIN 1 HOURS
ON o.order_id = s.order_id;
ON o.order_id = s.order_id
EMIT CHANGES;

For more information on joins, see :ref:`join-streams-and-tables`.

Expand Down
15 changes: 10 additions & 5 deletions docs/developer-guide/aggregate-streaming-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ because the result of the query is a KSQL table.
SELECT regionid,
COUNT(*)
FROM pageviews
GROUP BY regionid;
GROUP BY regionid
EMIT CHANGES;

Tombstone Records
=================
Expand Down Expand Up @@ -63,7 +64,8 @@ This query computes the pageview count per region per minute:
COUNT(*)
FROM pageviews
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY regionid;
GROUP BY regionid
EMIT CHANGES;

To count the pageviews for “Region_6” by female users every
30 seconds, you can change the previous query to the following:
Expand All @@ -76,7 +78,8 @@ To count the pageviews for “Region_6” by female users every
FROM pageviews
WINDOW TUMBLING (SIZE 30 SECONDS)
WHERE UCASE(gender)='FEMALE' AND LCASE(regionid)='region_6'
GROUP BY regionid;
GROUP BY regionid
EMIT CHANGES;

Aggregate Records Over a Hopping Window
=======================================
Expand All @@ -97,7 +100,8 @@ and substring matching.
FROM pageviews
WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS)
WHERE UCASE(gender)='FEMALE' AND LCASE (regionid) LIKE '%_6'
GROUP BY regionid;
GROUP BY regionid
EMIT CHANGES;

Aggregate Records Over a Session Window
=======================================
Expand All @@ -113,7 +117,8 @@ the input data and performs the counting step per region.
COUNT(*)
FROM pageviews
WINDOW SESSION (60 SECONDS)
GROUP BY regionid;
GROUP BY regionid
EMIT CHANGES;

For more information, see :ref:`time-and-windows-in-ksql-queries`.

Expand Down
18 changes: 9 additions & 9 deletions docs/developer-guide/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Here's an example request that retrieves streaming data from ``TEST_STREAM``:
curl -X "POST" "http://localhost:8088/query" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d $'{
"ksql": "SELECT * FROM TEST_STREAM;",
"ksql": "SELECT * FROM TEST_STREAM EMIT CHANGES;",
"streamsProperties": {}
}'

Expand Down Expand Up @@ -217,7 +217,7 @@ statements use the ``/query`` endpoint.
Content-Type: application/vnd.ksql.v1+json

{
"ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home'; CREATE STREAM pageviews_alice AS SELECT * FROM pageviews_original WHERE userid='alice';",
"ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home'; CREATE STREAM pageviews_alice AS SELECT * FROM pageviews_original WHERE userid='alice' EMIT CHANGES;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
Expand All @@ -232,7 +232,7 @@ statements use the ``/query`` endpoint.

[
{
"statementText":"CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home';",
"statementText":"CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home' EMIT CHANGES;",
"commandId":"stream/PAGEVIEWS_HOME/create",
"commandStatus": {
"status":"SUCCESS",
Expand All @@ -241,7 +241,7 @@ statements use the ``/query`` endpoint.
"commandSequenceNumber":10
},
{
"statementText":"CREATE STREAM pageviews_alice AS SELECT * FROM pageviews_original WHERE userid='alice';",
"statementText":"CREATE STREAM pageviews_alice AS SELECT * FROM pageviews_original WHERE userid='alice' EMIT CHANGES;",
"commandId":"stream/PAGEVIEWS_ALICE/create",
"commandStatus": {
"status":"SUCCESS",
Expand All @@ -266,7 +266,7 @@ similar to the example request above:
Content-Type: application/vnd.ksql.v1+json

{
"ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home'; CREATE TABLE pageviews_home_count AS SELECT userid, COUNT(*) FROM pageviews_home GROUP BY userid;"
"ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home'; CREATE TABLE pageviews_home_count AS SELECT userid, COUNT(*) FROM pageviews_home GROUP BY userid EMIT CHANGES;"
}

The second method is to submit the statements as separate requests and incorporate the interdependency by using ``commandSequenceNumber``.
Expand All @@ -279,7 +279,7 @@ Send the first request:
Content-Type: application/vnd.ksql.v1+json

{
"ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home';"
"ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home' EMIT CHANGES;"
}

Make note of the ``commandSequenceNumber`` returned in the response:
Expand All @@ -291,7 +291,7 @@ Make note of the ``commandSequenceNumber`` returned in the response:

[
{
"statementText":"CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home';",
"statementText":"CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home' EMIT CHANGES;",
"commandId":"stream/PAGEVIEWS_HOME/create",
"commandStatus": {
"status":"SUCCESS",
Expand All @@ -311,7 +311,7 @@ execute until after command number 10 has finished executing:
Content-Type: application/vnd.ksql.v1+json

{
"ksql": "CREATE TABLE pageviews_home_count AS SELECT userid, COUNT(*) FROM pageviews_home GROUP BY userid;",
"ksql": "CREATE TABLE pageviews_home_count AS SELECT userid, COUNT(*) FROM pageviews_home GROUP BY userid EMIT CHANGES;",
"commandSequenceNumber":10
}

Expand Down Expand Up @@ -346,7 +346,7 @@ The ``/query`` resource lets you stream the output records of a ``SELECT`` state
Content-Type: application/vnd.ksql.v1+json

{
"ksql": "SELECT * FROM pageviews;",
"ksql": "SELECT * FROM pageviews EMIT CHANGES;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
Expand Down
11 changes: 7 additions & 4 deletions docs/developer-guide/create-a-stream.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ In the KSQL CLI, paste the following CREATE STREAM statement:
userid VARCHAR,
pageid VARCHAR)
WITH (KAFKA_TOPIC='pageviews',
VALUE_FORMAT='DELIMITED');
VALUE_FORMAT='DELIMITED')
EMIT CHANGES;

Your output should resemble:

Expand Down Expand Up @@ -155,7 +156,8 @@ like this:
WITH (KAFKA_TOPIC='pageviews',
VALUE_FORMAT='DELIMITED',
KEY='pageid',
TIMESTAMP='viewtime');
TIMESTAMP='viewtime')
EMIT CHANGES;

Confirm that the TIMESTAMP field is ``viewtime`` by using the DESCRIBE EXTENDED
statement:
Expand Down Expand Up @@ -223,7 +225,8 @@ results from a persistent query that matches "introductory" pages that have a

CREATE STREAM pageviews_intro AS
SELECT * FROM pageviews
WHERE pageid < 'Page_20';
WHERE pageid < 'Page_20'
EMIT CHANGES;

Your output should resemble:

Expand Down Expand Up @@ -273,7 +276,7 @@ Your output should resemble:

Query ID | Kafka Topic | Query String
--------------------------------------------------------------------------------------------------------------------------------------------
CSAS_PAGEVIEWS_INTRO_0 | PAGEVIEWS_INTRO | CREATE STREAM pageviews_intro AS SELECT * FROM pageviews WHERE pageid < 'Page_20';
CSAS_PAGEVIEWS_INTRO_0 | PAGEVIEWS_INTRO | CREATE STREAM pageviews_intro AS SELECT * FROM pageviews WHERE pageid < 'Page_20' EMIT CHANGES;
--------------------------------------------------------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;

Expand Down
30 changes: 24 additions & 6 deletions docs/developer-guide/create-a-table.rst
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ statement:

.. code:: sql

SELECT * FROM users;
SELECT * FROM users EMIT CHANGES;

Your output should resemble:

Expand Down Expand Up @@ -144,7 +144,8 @@ results from a persistent query for users that have ``gender`` set to ``FEMALE``

CREATE TABLE users_female AS
SELECT userid, gender, regionid FROM users
WHERE gender='FEMALE';
WHERE gender='FEMALE'
EMIT CHANGES;

Your output should resemble:

Expand Down Expand Up @@ -206,7 +207,7 @@ Your output should resemble:

Query ID | Kafka Topic | Query String
-----------------------------------------------------------------------------------------------------------------------------------------
CTAS_USERS_FEMALE_0 | USERS_FEMALE | CREATE TABLE users_female AS SELECT userid, gender, regionid FROM users WHERE gender='FEMALE';
CTAS_USERS_FEMALE_0 | USERS_FEMALE | CREATE TABLE users_female AS SELECT userid, gender, regionid FROM users WHERE gender='FEMALE' EMIT CHANGES;
-----------------------------------------------------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;

Expand All @@ -225,7 +226,8 @@ function like COUNT(*) in the SELECT clause.
CREATE TABLE pageviews_table AS
SELECT viewtime, userid, pageid, COUNT(*) AS TOTAL
FROM pageviews_original WINDOW TUMBLING (SIZE 1 MINUTES)
GROUP BY viewtime, userid, pageid;
GROUP BY viewtime, userid, pageid
EMIT CHANGES;

Your output should resemble:

Expand All @@ -237,11 +239,11 @@ Your output should resemble:
---------------------------
ksql>

Inspect the table by using a SELECT statement.
Observe the changes happening to the table using a streaming SELECT statement.

.. code:: sql

SELECT * FROM pageviews_table;
SELECT * FROM pageviews_table EMIT CHANGES;

Your output should resemble:

Expand All @@ -257,6 +259,22 @@ Your output should resemble:
^CQuery terminated
ksql>

Lookup the value for a specific key within the table using a SELECT statement.

.. code:: sql

SELECT * FROM pageviews_table WHERE ROWKEY='1557183929488|+|User_9|+|Page_39';

Your output should resemble:

::

ROWKEY STRING KEY | WINDOWSTART BIGINT KEY | VIEWTIME BIGINT | USERID STRING | PAGEID STRING | TOTAL BIGINT
----------------------------------------------------------------------------------------------------------------------------
1557183929488|+|User_9|+|Page_39 | 1557183900000 | 1557183929488 | User_9 | Page_39 | 1
----------------------------------------------------------------------------------------------------------------------------
ksql>

Delete a KSQL Table
*******************

Expand Down
2 changes: 1 addition & 1 deletion docs/developer-guide/implement-a-udf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ Use the MULTIPLY function in a query. If you follow the steps in

::

SELECT MULTIPLY(rowtime, viewtime) FROM pageviews_original;
SELECT MULTIPLY(rowtime, viewtime) FROM pageviews_original EMIT CHANGES;

Your output should resemble:

Expand Down
6 changes: 4 additions & 2 deletions docs/developer-guide/join-streams-and-tables.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ combination of a ``pageviews`` stream and a ``users`` table:

CREATE STREAM pageviews_enriched AS
SELECT users.userid AS userid, pageid, regionid, gender FROM pageviews
LEFT JOIN users ON pageviews.userid = users.userid;
LEFT JOIN users ON pageviews.userid = users.userid
EMIT CHANGES;


For the full code example, see :ref:`ksql_quickstart-docker`.
Expand All @@ -51,7 +52,8 @@ expected time of two hours.
CREATE STREAM late_orders AS
SELECT o.orderid, o.itemid FROM orders o
FULL OUTER JOIN shipments s WITHIN 2 HOURS
ON s.orderid = o.orderid WHERE s.orderid IS NULL;
ON s.orderid = o.orderid WHERE s.orderid IS NULL
EMIT CHANGES;

Joins and Windows
*****************
Expand Down
Loading