Skip to content

Commit

Permalink
fix: bug on running/queued queries
Browse files Browse the repository at this point in the history
  • Loading branch information
abelanger5 committed Feb 3, 2025
1 parent 6b58192 commit d9aa4ee
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 27 deletions.
4 changes: 3 additions & 1 deletion pkg/repository/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (r *olapEventRepository) ListTaskRuns(ctx context.Context, tenantId string,
Valid: true,
}

realTimeParams.Statuses = []string{
realTimeParams.Eventstatuses = []string{
string(timescalev2.V2ReadableStatusOlapQUEUED),
}

Expand All @@ -159,9 +159,11 @@ func (r *olapEventRepository) ListTaskRuns(ctx context.Context, tenantId string,
Valid: true,
}
} else {
realTimeParams.Eventstatuses = make([]string, 0)
realTimeParams.Statuses = make([]string, 0)

for _, status := range statuses {
realTimeParams.Eventstatuses = append(realTimeParams.Eventstatuses, string(status))
realTimeParams.Statuses = append(realTimeParams.Statuses, string(status))
}
}
Expand Down
25 changes: 16 additions & 9 deletions pkg/repository/v2/timescalev2/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ WITH relevant_events AS (
WHERE
tenant_id = @tenantId::uuid
AND inserted_at >= @insertedAfter::timestamptz
AND readable_status = ANY(cast(@statuses::text[] as v2_readable_status_olap[]))
AND readable_status = ANY(cast(@eventStatuses::text[] as v2_readable_status_olap[]))
AND (
sqlc.narg('workflowIds')::uuid[] IS NULL OR workflow_id = ANY(sqlc.narg('workflowIds')::uuid[])
)
Expand Down Expand Up @@ -193,16 +193,23 @@ WITH relevant_events AS (
v2_task_events_olap e
JOIN
unique_tasks t ON t.tenant_id = e.tenant_id AND t.task_id = e.task_id AND t.task_inserted_at = e.task_inserted_at
), agg AS (
SELECT
tenant_id,
task_id,
task_inserted_at,
(array_agg(readable_status ORDER BY retry_count DESC, readable_status DESC))[1]::v2_readable_status_olap AS status,
max(retry_count)::integer AS max_retry_count
FROM all_task_events
GROUP BY tenant_id, task_id, task_inserted_at
ORDER BY task_inserted_at DESC, task_id DESC
)
SELECT
tenant_id,
task_id,
task_inserted_at,
(array_agg(readable_status ORDER BY retry_count DESC, readable_status DESC))[1]::v2_readable_status_olap AS status,
max(retry_count)::integer AS max_retry_count
FROM all_task_events
GROUP BY tenant_id, task_id, task_inserted_at
ORDER BY task_inserted_at DESC, task_id DESC
*
FROM
agg
WHERE
sqlc.narg('statuses')::text[] IS NULL OR status = ANY(cast(@statuses::text[] as v2_readable_status_olap[]))
LIMIT @taskLimit::integer;

-- name: ListTasksFromAggregate :many
Expand Down
43 changes: 26 additions & 17 deletions pkg/repository/v2/timescalev2/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d9aa4ee

Please sign in to comment.