Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a schema.sql shortcut file #28

Open
wants to merge 2 commits into
base: lts
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ecommerce-redpanda/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ You'll need to have [docker and docker-compose installed](https://materialize.co
```

_(This is just a shortcut to a docker container with postgres-client pre-installed, if you already have psql you could run `psql -U materialize -h localhost -p 6875 materialize`)_

> :zap: If you want to skip the schema creation steps and load everything at once, you can run `psql -U materialize -h localhost -p 6875 -f schema.sql`

7. Now that you're in the Materialize CLI, define all of the tables in `mysql.shop` as Kafka sources:

Expand Down
114 changes: 114 additions & 0 deletions ecommerce-redpanda/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
-- CREATE SOURCES

CREATE SOURCE purchases
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.purchases'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081'
ENVELOPE DEBEZIUM;

CREATE SOURCE items
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.items'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081'
ENVELOPE DEBEZIUM;

CREATE SOURCE users
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.users'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081'
ENVELOPE DEBEZIUM;

CREATE SOURCE json_pageviews
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'pageviews'
FORMAT BYTES;



-- CREATE NON-MATERIALIZED VIEW TO PARSE JSON PAGEVIEWS

CREATE VIEW pageview_stg AS
SELECT
*,
regexp_match(url, '/(products|profiles)/')[1] AS pageview_type,
(regexp_match(url, '/(?:products|profiles)/(\d+)')[1])::INT AS target_id
FROM (
SELECT
(data->'user_id')::INT AS user_id,
data->>'url' AS url,
data->>'channel' AS channel,
(data->>'received_at')::double AS received_at
FROM (
SELECT CAST(data AS jsonb) AS data
FROM (
SELECT convert_from(data, 'utf8') AS data
FROM json_pageviews
)
)
);


-- CREATE ANALYTICAL VIEWS

CREATE MATERIALIZED VIEW purchases_by_item AS
SELECT
item_id,
SUM(purchase_price) as revenue,
COUNT(id) AS orders,
SUM(quantity) AS items_sold
FROM purchases GROUP BY 1;

CREATE MATERIALIZED VIEW pageviews_by_item AS
SELECT
target_id as item_id,
COUNT(*) AS pageviews
FROM pageview_stg
WHERE pageview_type = 'products'
GROUP BY 1;

CREATE MATERIALIZED VIEW item_summary AS
SELECT
items.name,
items.category,
SUM(purchases_by_item.items_sold) as items_sold,
SUM(purchases_by_item.orders) as orders,
SUM(purchases_by_item.revenue) as revenue,
SUM(pageviews_by_item.pageviews) as pageviews,
SUM(purchases_by_item.orders) / SUM(pageviews_by_item.pageviews)::FLOAT AS conversion_rate
FROM items
JOIN purchases_by_item ON purchases_by_item.item_id = items.id
JOIN pageviews_by_item ON pageviews_by_item.item_id = items.id
GROUP BY 1, 2;


-- CREATE USER-FACING ANALYTICS VIEWS

CREATE MATERIALIZED VIEW profile_views_per_minute_last_10 AS
SELECT
target_id as user_id,
date_trunc('minute', to_timestamp(received_at)) as received_at_minute,
COUNT(*) as pageviews
FROM pageview_stg
WHERE
pageview_type = 'profiles' AND
mz_logical_timestamp() < (received_at*1000 + 600000)::numeric
GROUP BY 1, 2;

CREATE MATERIALIZED VIEW profile_views AS
SELECT
target_id AS owner_id,
user_id AS viewer_id,
received_at AS received_at
FROM (SELECT DISTINCT target_id FROM pageview_stg) grp,
LATERAL (
SELECT user_id, received_at FROM pageview_stg
WHERE target_id = grp.target_id
ORDER BY received_at DESC LIMIT 10
);

CREATE MATERIALIZED VIEW profile_views_enriched AS
SELECT
owner.id as owner_id,
owner.email as owner_email,
viewers.id as viewer_id,
viewers.email as viewer_email,
profile_views.received_at
FROM profile_views
JOIN users owner ON profile_views.owner_id = owner.id
JOIN users viewers ON profile_views.viewer_id = viewers.id;