From 11fb9f12b4e8b5919f32662547059924c6822296 Mon Sep 17 00:00:00 2001 From: Andy Hattemer Date: Tue, 26 Apr 2022 22:50:10 -0400 Subject: [PATCH 1/2] Create Schema.sql --- ecommerce-redpanda/schema.sql | 114 ++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 ecommerce-redpanda/schema.sql diff --git a/ecommerce-redpanda/schema.sql b/ecommerce-redpanda/schema.sql new file mode 100644 index 00000000..40c4640c --- /dev/null +++ b/ecommerce-redpanda/schema.sql @@ -0,0 +1,114 @@ +-- CREATE SOURCES + +CREATE SOURCE purchases +FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.purchases' +FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081' +ENVELOPE DEBEZIUM; + +CREATE SOURCE items +FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.items' +FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081' +ENVELOPE DEBEZIUM; + +CREATE SOURCE users +FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.users' +FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081' +ENVELOPE DEBEZIUM; + +CREATE SOURCE json_pageviews +FROM KAFKA BROKER 'redpanda:9092' TOPIC 'pageviews' +FORMAT BYTES; + + + +-- CREATE NON-MATERIALIZED VIEW TO PARSE JSON PAGEVIEWS + +CREATE VIEW pageview_stg AS + SELECT + *, + regexp_match(url, '/(products|profiles)/')[1] AS pageview_type, + (regexp_match(url, '/(?:products|profiles)/(\d+)')[1])::INT AS target_id + FROM ( + SELECT + (data->'user_id')::INT AS user_id, + data->>'url' AS url, + data->>'channel' AS channel, + (data->>'received_at')::double AS received_at + FROM ( + SELECT CAST(data AS jsonb) AS data + FROM ( + SELECT convert_from(data, 'utf8') AS data + FROM json_pageviews + ) + ) + ); + + +-- CREATE ANALYTICAL VIEWS + +CREATE MATERIALIZED VIEW purchases_by_item AS + SELECT + item_id, + SUM(purchase_price) as revenue, + COUNT(id) AS orders, + SUM(quantity) AS items_sold + FROM purchases GROUP BY 1; + +CREATE MATERIALIZED VIEW pageviews_by_item AS + SELECT + target_id as item_id, + COUNT(*) AS pageviews + FROM pageview_stg + WHERE pageview_type = 'products' + GROUP BY 1; + + CREATE MATERIALIZED VIEW item_summary AS + SELECT + items.name, + items.category, + SUM(purchases_by_item.items_sold) as items_sold, + SUM(purchases_by_item.orders) as orders, + SUM(purchases_by_item.revenue) as revenue, + SUM(pageviews_by_item.pageviews) as pageviews, + SUM(purchases_by_item.orders) / SUM(pageviews_by_item.pageviews)::FLOAT AS conversion_rate + FROM items + JOIN purchases_by_item ON purchases_by_item.item_id = items.id + JOIN pageviews_by_item ON pageviews_by_item.item_id = items.id + GROUP BY 1, 2; + + +-- CREATE USER-FACING ANALYTICS VIEWS + +CREATE MATERIALIZED VIEW profile_views_per_minute_last_10 AS + SELECT + target_id as user_id, + date_trunc('minute', to_timestamp(received_at)) as received_at_minute, + COUNT(*) as pageviews + FROM pageview_stg + WHERE + pageview_type = 'profiles' AND + mz_logical_timestamp() < (received_at*1000 + 600000)::numeric + GROUP BY 1, 2; + +CREATE MATERIALIZED VIEW profile_views AS + SELECT + target_id AS owner_id, + user_id AS viewer_id, + received_at AS received_at + FROM (SELECT DISTINCT target_id FROM pageview_stg) grp, + LATERAL ( + SELECT user_id, received_at FROM pageview_stg + WHERE target_id = grp.target_id + ORDER BY received_at DESC LIMIT 10 + ); + +CREATE MATERIALIZED VIEW profile_views_enriched AS + SELECT + owner.id as owner_id, + owner.email as owner_email, + viewers.id as viewer_id, + viewers.email as viewer_email, + profile_views.received_at + FROM profile_views + JOIN users owner ON profile_views.owner_id = owner.id + JOIN users viewers ON profile_views.viewer_id = viewers.id; From 134b4d831f90588b1e1ef09e3d92f848d5d1e1b3 Mon Sep 17 00:00:00 2001 From: Andy Hattemer Date: Wed, 27 Apr 2022 07:46:46 -0400 Subject: [PATCH 2/2] Add a note in the readme --- ecommerce-redpanda/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ecommerce-redpanda/README.md b/ecommerce-redpanda/README.md index 4f122083..4c082a28 100644 --- a/ecommerce-redpanda/README.md +++ b/ecommerce-redpanda/README.md @@ -75,6 +75,8 @@ You'll need to have [docker and docker-compose installed](https://materialize.co ``` _(This is just a shortcut to a docker container with postgres-client pre-installed, if you already have psql you could run `psql -U materialize -h localhost -p 6875 materialize`)_ + + > :zap: If you want to skip the schema creation steps and load everything at once, you can run `psql -U materialize -h localhost -p 6875 -f schema.sql` 7. Now that you're in the Materialize CLI, define all of the tables in `mysql.shop` as Kafka sources: