Skip to content

Commit

Permalink
improve queries and timescale config
Browse files Browse the repository at this point in the history
  • Loading branch information
abelanger5 committed Feb 2, 2025
1 parent 5848305 commit e341527
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 107 deletions.
9 changes: 8 additions & 1 deletion pkg/config/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,19 @@ func createControllerLayer(dc *database.Layer, cf *server.ServerConfigFile, vers
postgres.WithQos(cf.MessageQueue.Postgres.Qos),
)
case "rabbitmq":
cleanup1, mq = rabbitmq.New(
cleanupRabbit, rabbitMq := rabbitmq.New(
rabbitmq.WithURL(cf.MessageQueue.RabbitMQ.URL),
rabbitmq.WithLogger(&l),
rabbitmq.WithQos(cf.MessageQueue.RabbitMQ.Qos),
rabbitmq.WithDisableTenantExchangePubs(cf.Runtime.DisableTenantPubs),
)

if rabbitMq == nil {
return nil, nil, fmt.Errorf("could not create rabbitmq message queue")
}

cleanup1 = cleanupRabbit
mq = rabbitMq
}

ing, err = ingestor.NewIngestor(
Expand Down
68 changes: 39 additions & 29 deletions pkg/repository/v2/timescalev2/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,29 @@ JOIN
-- name: ListTasks :many
WITH task_statuses AS (
SELECT
tenant_id::uuid,
task_id::bigint,
task_inserted_at::timestamptz,
status::v2_readable_status_olap,
max_retry_count::int
s.tenant_id::uuid as tenant_id,
s.task_id::bigint as id,
s.task_inserted_at::timestamptz as inserted_at,
s.status::v2_readable_status_olap as status,
s.max_retry_count::int as max_retry_count,
t.external_id as external_id,
t.queue as queue,
t.action_id as action_id,
t.step_id as step_id,
t.workflow_id as workflow_id,
t.schedule_timeout as schedule_timeout,
t.step_timeout as step_timeout,
t.priority as priority,
t.sticky as sticky,
t.display_name as display_name
FROM
v2_cagg_task_status
v2_cagg_task_status s
JOIN
v2_tasks_olap t ON t.tenant_id = s.tenant_id AND t.id = s.task_id AND t.inserted_at = s.task_inserted_at
WHERE
tenant_id = @tenantId::uuid
s.tenant_id = @tenantId::uuid
AND bucket >= @createdAfter::timestamptz
ORDER BY bucket DESC, task_inserted_at DESC, task_id DESC
ORDER BY bucket DESC, s.task_inserted_at DESC, s.task_id DESC
LIMIT 50
), finished_ats AS (
SELECT
Expand All @@ -114,7 +126,7 @@ WITH task_statuses AS (
FROM
v2_task_events_olap e
JOIN
task_statuses ts ON ts.task_id = e.task_id AND ts.tenant_id = e.tenant_id AND ts.task_inserted_at = e.task_inserted_at AND ts.max_retry_count = e.retry_count
task_statuses ts ON ts.id = e.task_id AND ts.tenant_id = e.tenant_id AND ts.inserted_at = e.task_inserted_at AND ts.max_retry_count = e.retry_count
WHERE
e.readable_status = ANY(ARRAY['COMPLETED', 'FAILED', 'CANCELLED']::v2_readable_status_olap[])
GROUP BY e.task_id
Expand All @@ -125,37 +137,35 @@ WITH task_statuses AS (
FROM
v2_task_events_olap e
JOIN
task_statuses ts ON ts.task_id = e.task_id AND ts.tenant_id = e.tenant_id AND ts.task_inserted_at = e.task_inserted_at AND ts.max_retry_count = e.retry_count
task_statuses ts ON ts.id = e.task_id AND ts.tenant_id = e.tenant_id AND ts.inserted_at = e.task_inserted_at AND ts.max_retry_count = e.retry_count
WHERE
e.event_type = 'STARTED'
GROUP BY e.task_id
)
SELECT
t.tenant_id,
t.id,
t.inserted_at,
t.external_id,
t.queue,
t.action_id,
t.step_id,
t.workflow_id,
t.schedule_timeout,
t.step_timeout,
t.priority,
t.sticky,
t.display_name,
ts.tenant_id,
ts.id,
ts.inserted_at,
ts.external_id,
ts.queue,
ts.action_id,
ts.step_id,
ts.workflow_id,
ts.schedule_timeout,
ts.step_timeout,
ts.priority,
ts.sticky,
ts.display_name,
ts.status::v2_readable_status_olap as status,
f.finished_at::timestamptz as finished_at,
s.started_at::timestamptz as started_at
FROM
v2_tasks_olap t
JOIN
task_statuses ts ON ts.task_id = t.id AND ts.tenant_id = t.tenant_id AND ts.task_inserted_at = t.inserted_at
task_statuses ts
LEFT JOIN
finished_ats f ON f.task_id = t.id
finished_ats f ON f.task_id = ts.id
LEFT JOIN
started_ats s ON s.task_id = t.id
ORDER BY t.inserted_at DESC, t.id DESC;
started_ats s ON s.task_id = ts.id
ORDER BY ts.inserted_at DESC, ts.id DESC;

-- name: ListTaskEvents :many
WITH aggregated_events AS (
Expand Down
68 changes: 39 additions & 29 deletions pkg/repository/v2/timescalev2/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 16 additions & 48 deletions sql/schema/timescale.sql
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ CREATE INDEX v2_task_events_olap_task_id_idx ON v2_task_events_olap (task_id);
SELECT * from create_hypertable('v2_task_events_olap', by_range('task_inserted_at', INTERVAL '1 day'));

CREATE MATERIALIZED VIEW v2_cagg_task_status
WITH (timescaledb.continuous) AS
WITH (timescaledb.continuous, timescaledb.materialized_only = false, timescaledb.create_group_indexes = false) AS
SELECT
tenant_id,
task_id,
Expand All @@ -118,14 +118,18 @@ SELECT
max(retry_count) AS max_retry_count
FROM v2_task_events_olap
GROUP BY tenant_id, task_id, task_inserted_at, bucket
ORDER BY bucket DESC, task_inserted_at DESC;
ORDER BY bucket DESC, task_inserted_at DESC
WITH NO DATA;

ALTER MATERIALIZED VIEW v2_cagg_task_status set (timescaledb.materialized_only = false);
CREATE INDEX v2_cagg_task_status_bucket_tenant_id_status_idx ON v2_cagg_task_status (bucket, tenant_id, status);

CREATE INDEX v2_cagg_task_status_tenant_id_status_idx ON v2_cagg_task_status (tenant_id, status);
SELECT add_continuous_aggregate_policy('v2_cagg_task_status',
start_offset => NULL,
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');

CREATE MATERIALIZED VIEW v2_cagg_status_metrics
WITH (timescaledb.continuous)
WITH (timescaledb.continuous, timescaledb.materialized_only = false)
AS
SELECT
time_bucket('5 minutes', bucket) AS bucket_2,
Expand All @@ -137,46 +141,10 @@ CREATE MATERIALIZED VIEW v2_cagg_status_metrics
COUNT(*) FILTER (WHERE status = 'FAILED') AS failed_count
FROM v2_cagg_task_status
GROUP BY bucket_2, tenant_id
ORDER BY bucket_2;

ALTER MATERIALIZED VIEW v2_cagg_status_metrics set (timescaledb.materialized_only = false);

-- CREATE TABLE v2_task_statuses (
-- tenant_id UUID NOT NULL,
-- task_id BIGINT NOT NULL,
-- inserted_at TIMESTAMPTZ(3) NOT NULL,
-- readable_status v2_readable_status_olap NOT NULL,
-- retry_count INT NOT NULL DEFAULT 0,

-- PRIMARY KEY (tenant_id, task_id, inserted_at)
-- );

-- CREATE OR REPLACE FUNCTION v2_tasks_olap_status_insert_function()
-- RETURNS TRIGGER AS
-- $$
-- BEGIN
-- INSERT INTO v2_task_statuses (
-- tenant_id,
-- task_id,
-- inserted_at,
-- readable_status,
-- retry_count
-- )
-- VALUES (
-- NEW.tenant_id,
-- NEW.task_id,
-- NEW.task_inserted_at,
-- NEW.readable_status,
-- NEW.retry_count
-- ) ON CONFLICT (tenant_id, task_id, inserted_at) DO NOTHING;
-- RETURN NEW;
-- END;
-- $$
-- LANGUAGE plpgsql;

-- CREATE TRIGGER v2_tasks_olap_status_insert_trigger
-- AFTER INSERT ON v2_task_events_olap
-- FOR EACH ROW
-- EXECUTE PROCEDURE v2_tasks_olap_status_insert_function();

-- SELECT * from create_hypertable('v2_task_statuses', by_range('inserted_at', INTERVAL '1 hour'));
ORDER BY bucket_2
WITH NO DATA;

SELECT add_continuous_aggregate_policy('v2_cagg_status_metrics',
start_offset => NULL,
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');

0 comments on commit e341527

Please sign in to comment.