Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DEVX-2298: address how nulls are handled in ksqlDB; DEVX-2310: rename ksqlDB query #316

Merged
merged 11 commits into from
Jan 6, 2021
Binary file modified docs/images/cluster_raleigh.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/images/create_topic.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/images/ksql_link.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/images/ksql_streams_list.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/images/ksqldb_flow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/images/landing_page.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/images/topic_list_wikipedia.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
23 changes: 7 additions & 16 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ It generates the keys and certificates, brings up the Docker containers, and con
You can run it with optional settings:

- ``CLEAN``: controls whether certificates and the locally built |kconnect| image are regenerated in between runs
- ``C3_KSQLDB_HTTPS``: sets |c3| and ksqlDB server to use ``HTTP`` or ``HTTPS`` (default: ``HTTP``)
- ``C3_KSQLDB_HTTPS``: controls whether |c3| and ksqlDB server use ``HTTP``, if parameter is false, or ``HTTPS``, if parameter is true (default: ``true`` for ``HTTP``)
- ``VIZ``: enables Elasticsearch and Kibana (default: ``true``)

#. To run ``cp-demo`` the first time with defaults, run the following command. This takes a few minutes to complete.
Expand Down Expand Up @@ -199,14 +199,17 @@ Log into |c3|
- Chrome: click on ``Advanced`` and when the window expands, click on ``Proceed to localhost (unsafe)``.

.. figure:: images/c3-chrome-cert-warning.png
:width: 500px

- Safari: open a new private browsing window (``Shift + ⌘ + N``), click on ``Show Details`` and when the window expands, click on ``visit this website``.

.. figure:: images/c3-safari-cert-warning.png
:width: 500px

#. At the login screen, log into |c3| as ``superUser`` and password ``superUser``, which has super user access to the cluster. You may also log in as :devx-cp-demo:`other users|scripts//security/ldap_users` to learn how each user's view changes depending on their permissions.

.. figure:: images/c3-login.png
:width: 500px


Brokers
Expand All @@ -215,10 +218,11 @@ Brokers
#. Select the cluster named "Kafka Raleigh".

.. figure:: images/cluster_raleigh.png
:width: 500px

#. Click on "Brokers".

#. View the status of the Brokers in the cluster:
#. View the status of the brokers in the cluster:

.. figure:: images/landing_page.png

Expand Down Expand Up @@ -366,19 +370,7 @@ Its embedded producer is configured to be idempotent, exactly-once in order sema
.. figure:: images/ksql_properties.png
:alt: image

#. This example creates two streams ``EN_WIKIPEDIA_GT_1`` and ``EN_WIKIPEDIA_GT_1_COUNTS`` to demonstrate how ksqlDB windows work. ``EN_WIKIPEDIA_GT_1`` counts occurrences with a tumbling window, and for a given key it writes a `null` into the table on the first seen message. The underlying Kafka topic for ``EN_WIKIPEDIA_GT_1`` does not filter out those nulls, but to send just the counts greater than one downstream, there is a separate Kafka topic for ``EN_WIKIPEDIA_GT_1_COUNTS`` which does filter out those nulls (e.g., the query has a clause ``where ROWTIME is not null``). From the bash prompt, view those underlying Kafka topics.

- View messages in the topic ``EN_WIKIPEDIA_GT_1`` (jump to offset 0/partition 0), and notice the nulls:

.. figure:: images/messages_in_EN_WIKIPEDIA_GT_1.png
:alt: image

- For comparison, view messages in the topic ``EN_WIKIPEDIA_GT_1_COUNTS`` (jump to offset 0/partition 0), and notice no nulls:

.. figure:: images/messages_in_EN_WIKIPEDIA_GT_1_COUNTS.png
:alt: image

11. The `ksqlDB processing log <https://docs.confluent.io/current/ksql/docs/developer-guide/processing-log.html>`__ captures per-record errors during processing to help developers debug their ksqlDB queries. In this example, the processing log uses mutual TLS (mTLS) authentication, as configured in the custom :devx-cp-demo:`log4j properties file|scripts/helper/log4j-secure.properties`, to write entries into a Kafka topic. To see it in action, in the ksqlDB editor run the following "bad" query for 20 seconds:
#. The `ksqlDB processing log <https://docs.confluent.io/current/ksql/docs/developer-guide/processing-log.html>`__ captures per-record errors during processing to help developers debug their ksqlDB queries. In this example, the processing log uses mutual TLS (mTLS) authentication, as configured in the custom :devx-cp-demo:`log4j properties file|scripts/helper/log4j-secure.properties`, to write entries into a Kafka topic. To see it in action, in the ksqlDB editor run the following "bad" query for 20 seconds:
ybyzek marked this conversation as resolved.
Show resolved Hide resolved

.. sourcecode:: bash

Expand Down Expand Up @@ -775,7 +767,6 @@ The security in place between |sr| and the end clients, e.g. ``appSA``, is as fo

[
"wikipedia.parsed.replica-value",
"EN_WIKIPEDIA_GT_1_COUNTS-value",
"WIKIPEDIABOT-value",
"EN_WIKIPEDIA_GT_1-value",
"_confluent-ksql-ksql-clusterquery_CTAS_EN_WIKIPEDIA_GT_1_7-Aggregate-Aggregate-Materialize-changelog-value",
Expand Down
2 changes: 1 addition & 1 deletion scripts/consumers/listen.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

for t in wikipedia.parsed WIKIPEDIABOT WIKIPEDIANOBOT EN_WIKIPEDIA_GT_1_COUNTS; do
for t in wikipedia.parsed WIKIPEDIABOT WIKIPEDIANOBOT; do

echo -e "\nSample message from Topic $t"
docker exec connect kafka-avro-console-consumer --bootstrap-server kafka1:11091,kafka2:11092 \
Expand Down
17 changes: 0 additions & 17 deletions scripts/consumers/listen_EN_WIKIPEDIA_GT_1_COUNTS.sh

This file was deleted.

2 changes: 1 addition & 1 deletion scripts/helper/create-topics.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export KAFKA_LOG4J_OPTS="-Dlog4j.rootLogger=DEBUG,stdout -Dlog4j.logger.kafka=DE
--partitions 2

# Create Kafka topics with prefix WIKIPEDIA or EN_WIKIPEDIA, using ksqlDBUser principal
for t in WIKIPEDIABOT WIKIPEDIANOBOT EN_WIKIPEDIA_GT_1 EN_WIKIPEDIA_GT_1_COUNTS
for t in WIKIPEDIABOT WIKIPEDIANOBOT EN_WIKIPEDIA_GT_1
do
export KAFKA_LOG4J_OPTS="-Dlog4j.rootLogger=DEBUG,stdout -Dlog4j.logger.kafka=DEBUG,stdout" && kafka-topics \
--bootstrap-server kafka1:11091 \
Expand Down
1 change: 0 additions & 1 deletion scripts/ksqlDB/statements.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@ CREATE STREAM wikipedianobot AS SELECT *, (length->new - length->old) AS BYTECHA
CREATE STREAM wikipediabot AS SELECT *, (length->new - length->old) AS BYTECHANGE FROM wikipedia WHERE bot = true AND length IS NOT NULL AND length->new IS NOT NULL AND length->old IS NOT NULL;
CREATE TABLE en_wikipedia_gt_1 AS SELECT user, meta->uri AS URI, count(*) AS COUNT FROM wikipedia WINDOW TUMBLING (size 300 second) WHERE meta->domain = 'commons.wikimedia.org' GROUP BY user, meta->uri HAVING count(*) > 1;
CREATE STREAM en_wikipedia_gt_1_stream (USER string, URI string, COUNT bigint) WITH (kafka_topic='EN_WIKIPEDIA_GT_1', value_format='AVRO');
ybyzek marked this conversation as resolved.
Show resolved Hide resolved
CREATE STREAM en_wikipedia_gt_1_counts AS SELECT * FROM en_wikipedia_gt_1_stream where ROWTIME is not null;
3 changes: 1 addition & 2 deletions scripts/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,8 @@ retry $MAX_WAIT host_check_ksqlDBserver_up || exit 1
echo -e "\nRun ksqlDB queries (takes about 1 minute):"
${DIR}/ksqlDB/run_ksqlDB.sh

echo -e "\nStart consumers for additional topics: WIKIPEDIANOBOT, EN_WIKIPEDIA_GT_1_COUNTS"
echo -e "\nStart additional consumer for topic WIKIPEDIANOBOT"
ybyzek marked this conversation as resolved.
Show resolved Hide resolved
${DIR}/consumers/listen_WIKIPEDIANOBOT.sh
${DIR}/consumers/listen_EN_WIKIPEDIA_GT_1_COUNTS.sh

echo
echo
Expand Down