From c196bb6aa523a8a8c22a481e90300d5b02425baf Mon Sep 17 00:00:00 2001 From: Safeer Jiwan Date: Mon, 19 Aug 2024 15:08:12 -0700 Subject: [PATCH] carve off async_calls for cron and future dependency, use sqlc.embed to dedupe code --- Justfile | 2 +- backend/controller/cronjobs/dal.go | 34 +- .../cronjobs/sql/async_queries.sql.go | 97 + backend/controller/cronjobs/sql/models.go | 42 - backend/controller/cronjobs/sql/querier.go | 109 - backend/controller/cronjobs/sql/queries.sql | 13 +- .../controller/cronjobs/sql/queries.sql.go | 2731 +---------------- backend/controller/sql/async_queries.sql | 37 + backend/controller/sql/async_queries.sql.go | 97 + backend/controller/sql/models.go | 42 - backend/controller/sql/queries.sql | 29 - backend/controller/sql/queries.sql.go | 152 +- ...0240815164808_async_calls_cron_job_key.sql | 2 + internal/configuration/sql/models.go | 42 - sqlc.yaml | 8 +- 15 files changed, 355 insertions(+), 3082 deletions(-) create mode 100644 backend/controller/cronjobs/sql/async_queries.sql.go create mode 100644 backend/controller/sql/async_queries.sql create mode 100644 backend/controller/sql/async_queries.sql.go diff --git a/Justfile b/Justfile index 0796340e95..774c2e2f33 100644 --- a/Justfile +++ b/Justfile @@ -75,7 +75,7 @@ init-db: # Regenerate SQLC code (requires init-db to be run first) build-sqlc: - @mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} backend/controller/cronjobs/sql/{db.go,models.go,querier.go,queries.sql.go} common/configuration/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/cronjobs/sql/queries.sql common/configuration/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate" + @mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} backend/controller/cronjobs/sql/{db.go,models.go,querier.go,queries.sql.go} common/configuration/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/sql/async_queries.sql backend/controller/cronjobs/sql/queries.sql common/configuration/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate" # Build the ZIP files that are embedded in the FTL release binaries build-zips: build-kt-runtime diff --git a/backend/controller/cronjobs/dal.go b/backend/controller/cronjobs/dal.go index c31a2b7783..dd1833b5ef 100644 --- a/backend/controller/cronjobs/dal.go +++ b/backend/controller/cronjobs/dal.go @@ -64,27 +64,15 @@ func (t *Tx) Rollback(ctx context.Context) error { return nil } -func cronJobFromGetByKeyRow(row sql.GetCronJobByKeyRow) model.CronJob { +func cronJobFromRow(c sql.CronJob, d sql.Deployment) model.CronJob { return model.CronJob{ - Key: row.Key, - DeploymentKey: row.DeploymentKey, - Verb: schema.Ref{Module: row.Module, Name: row.Verb}, - Schedule: row.Schedule, - StartTime: row.StartTime, - NextExecution: row.NextExecution, - LastExecution: row.LastExecution, - } -} - -func cronJobFromGetUnscheduledRow(row sql.GetUnscheduledCronJobsRow) model.CronJob { - return model.CronJob{ - Key: row.Key, - DeploymentKey: row.DeploymentKey, - Verb: schema.Ref{Module: row.Module, Name: row.Verb}, - Schedule: row.Schedule, - StartTime: row.StartTime, - NextExecution: row.NextExecution, - LastExecution: row.LastExecution, + Key: c.Key, + DeploymentKey: d.Key, + Verb: schema.Ref{Module: c.ModuleName, Name: c.Verb}, + Schedule: c.Schedule, + StartTime: c.StartTime, + NextExecution: c.NextExecution, + LastExecution: c.LastExecution, } } @@ -95,7 +83,9 @@ func (d *DAL) GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ( if err != nil { return nil, fmt.Errorf("failed to get cron jobs: %w", dalerrs.TranslatePGError(err)) } - return slices.Map(rows, cronJobFromGetUnscheduledRow), nil + return slices.Map(rows, func(r sql.GetUnscheduledCronJobsRow) model.CronJob { + return cronJobFromRow(r.CronJob, r.Deployment) + }), nil } // GetCronJobByKey returns a cron job by its key @@ -104,5 +94,5 @@ func (d *DAL) GetCronJobByKey(ctx context.Context, key model.CronJobKey) (model. if err != nil { return model.CronJob{}, fmt.Errorf("failed to get cron job %q: %w", key, dalerrs.TranslatePGError(err)) } - return cronJobFromGetByKeyRow(row), nil + return cronJobFromRow(row.CronJob, row.Deployment), nil } diff --git a/backend/controller/cronjobs/sql/async_queries.sql.go b/backend/controller/cronjobs/sql/async_queries.sql.go new file mode 100644 index 0000000000..cc4a119160 --- /dev/null +++ b/backend/controller/cronjobs/sql/async_queries.sql.go @@ -0,0 +1,97 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 +// source: async_queries.sql + +package sql + +import ( + "context" + "encoding/json" + "time" + + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/model" + "github.com/alecthomas/types/optional" +) + +const createAsyncCall = `-- name: CreateAsyncCall :one +INSERT INTO async_calls ( + scheduled_at, + verb, + origin, + request, + remaining_attempts, + backoff, + max_backoff, + catch_verb, + parent_request_key, + trace_context, + cron_job_key +) +VALUES ( + $1::TIMESTAMPTZ, + $2, + $3, + $4, + $5, + $6::interval, + $7::interval, + $8, + $9, + $10::jsonb, + $11 +) +RETURNING id +` + +type CreateAsyncCallParams struct { + ScheduledAt time.Time + Verb schema.RefKey + Origin string + Request []byte + RemainingAttempts int32 + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration + CatchVerb optional.Option[schema.RefKey] + ParentRequestKey optional.Option[string] + TraceContext json.RawMessage + CronJobKey optional.Option[model.CronJobKey] +} + +func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) { + row := q.db.QueryRowContext(ctx, createAsyncCall, + arg.ScheduledAt, + arg.Verb, + arg.Origin, + arg.Request, + arg.RemainingAttempts, + arg.Backoff, + arg.MaxBackoff, + arg.CatchVerb, + arg.ParentRequestKey, + arg.TraceContext, + arg.CronJobKey, + ) + var id int64 + err := row.Scan(&id) + return id, err +} + +const isCronJobPending = `-- name: IsCronJobPending :one +SELECT EXISTS ( + SELECT 1 + FROM async_calls ac + WHERE ac.cron_job_key = $1::cron_job_key + AND ac.scheduled_at > $2::TIMESTAMPTZ + AND ac.state = 'pending' +) AS pending +` + +func (q *Queries) IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error) { + row := q.db.QueryRowContext(ctx, isCronJobPending, key, startTime) + var pending bool + err := row.Scan(&pending) + return pending, err +} diff --git a/backend/controller/cronjobs/sql/models.go b/backend/controller/cronjobs/sql/models.go index 883d140976..17f10334f0 100644 --- a/backend/controller/cronjobs/sql/models.go +++ b/backend/controller/cronjobs/sql/models.go @@ -105,48 +105,6 @@ func (ns NullControllerState) Value() (driver.Value, error) { return string(ns.ControllerState), nil } -type CronJobState string - -const ( - CronJobStateIdle CronJobState = "idle" - CronJobStateExecuting CronJobState = "executing" -) - -func (e *CronJobState) Scan(src interface{}) error { - switch s := src.(type) { - case []byte: - *e = CronJobState(s) - case string: - *e = CronJobState(s) - default: - return fmt.Errorf("unsupported scan type for CronJobState: %T", src) - } - return nil -} - -type NullCronJobState struct { - CronJobState CronJobState - Valid bool // Valid is true if CronJobState is not NULL -} - -// Scan implements the Scanner interface. -func (ns *NullCronJobState) Scan(value interface{}) error { - if value == nil { - ns.CronJobState, ns.Valid = "", false - return nil - } - ns.Valid = true - return ns.CronJobState.Scan(value) -} - -// Value implements the driver Valuer interface. -func (ns NullCronJobState) Value() (driver.Value, error) { - if !ns.Valid { - return nil, nil - } - return string(ns.CronJobState), nil -} - type EventType string const ( diff --git a/backend/controller/cronjobs/sql/querier.go b/backend/controller/cronjobs/sql/querier.go index 19868fd5b9..4c726cbf5a 100644 --- a/backend/controller/cronjobs/sql/querier.go +++ b/backend/controller/cronjobs/sql/querier.go @@ -6,127 +6,18 @@ package sql import ( "context" - "encoding/json" "time" - "github.com/TBD54566975/ftl/backend/controller/leases" - "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" - "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/model" - "github.com/alecthomas/types/optional" - "github.com/google/uuid" - "github.com/sqlc-dev/pqtype" ) type Querier interface { - // Reserve a pending async call for execution, returning the associated lease - // reservation key and accompanying metadata. - AcquireAsyncCall(ctx context.Context, ttl sqltypes.Duration) (AcquireAsyncCallRow, error) - AssociateArtefactWithDeployment(ctx context.Context, arg AssociateArtefactWithDeploymentParams) error - AsyncCallQueueDepth(ctx context.Context) (int64, error) - BeginConsumingTopicEvent(ctx context.Context, subscription model.SubscriptionKey, event model.TopicEventKey) error - CompleteEventForSubscription(ctx context.Context, name string, module string) error - // Create a new artefact and return the artefact ID. - CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) CreateCronJob(ctx context.Context, arg CreateCronJobParams) error - CreateDeployment(ctx context.Context, moduleName string, schema []byte, key model.DeploymentKey) error - CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error - CreateOnlyEncryptionKey(ctx context.Context, key []byte) error - CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error - DeleteOldTimelineEvents(ctx context.Context, timeout sqltypes.Duration, type_ EventType) (int64, error) - DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) - DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) - DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error) - ExpireLeases(ctx context.Context) (int64, error) - ExpireRunnerReservations(ctx context.Context) (int64, error) - FailAsyncCall(ctx context.Context, error string, iD int64) (bool, error) - FailAsyncCallWithRetry(ctx context.Context, arg FailAsyncCallWithRetryParams) (bool, error) - FailFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (bool, error) - // Mark an FSM transition as completed, updating the current state and clearing the async call ID. - FinishFSMTransition(ctx context.Context, fsm schema.RefKey, key string) (bool, error) - GetActiveControllers(ctx context.Context) ([]Controller, error) - GetActiveDeploymentSchemas(ctx context.Context) ([]GetActiveDeploymentSchemasRow, error) - GetActiveDeployments(ctx context.Context) ([]GetActiveDeploymentsRow, error) - GetActiveIngressRoutes(ctx context.Context) ([]GetActiveIngressRoutesRow, error) - GetActiveRunners(ctx context.Context) ([]GetActiveRunnersRow, error) - GetArtefactContentRange(ctx context.Context, start int32, count int32, iD int64) ([]byte, error) - // Return the digests that exist in the database. - GetArtefactDigests(ctx context.Context, digests [][]byte) ([]GetArtefactDigestsRow, error) GetCronJobByKey(ctx context.Context, key model.CronJobKey) (GetCronJobByKeyRow, error) - GetDeployment(ctx context.Context, key model.DeploymentKey) (GetDeploymentRow, error) - // Get all artefacts matching the given digests. - GetDeploymentArtefacts(ctx context.Context, deploymentID int64) ([]GetDeploymentArtefactsRow, error) - GetDeploymentsByID(ctx context.Context, ids []int64) ([]Deployment, error) - // Get deployments that have a mismatch between the number of assigned and required replicas. - GetDeploymentsNeedingReconciliation(ctx context.Context) ([]GetDeploymentsNeedingReconciliationRow, error) - // Get all deployments that have artefacts matching the given digests. - GetDeploymentsWithArtefacts(ctx context.Context, digests [][]byte, schema []byte, count int64) ([]GetDeploymentsWithArtefactsRow, error) - GetDeploymentsWithMinReplicas(ctx context.Context) ([]GetDeploymentsWithMinReplicasRow, error) - GetExistingDeploymentForModule(ctx context.Context, name string) (GetExistingDeploymentForModuleRow, error) - GetFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (FsmInstance, error) - GetIdleRunners(ctx context.Context, labels json.RawMessage, limit int64) ([]Runner, error) - // Get the runner endpoints corresponding to the given ingress route. - GetIngressRoutes(ctx context.Context, method string) ([]GetIngressRoutesRow, error) - GetLeaseInfo(ctx context.Context, key leases.Key) (GetLeaseInfoRow, error) - GetModulesByID(ctx context.Context, ids []int64) ([]Module, error) - GetNextEventForSubscription(ctx context.Context, consumptionDelay sqltypes.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error) - GetOnlyEncryptionKey(ctx context.Context) ([]byte, error) - GetProcessList(ctx context.Context) ([]GetProcessListRow, error) - GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error) - // Retrieve routing information for a runner. - GetRouteForRunner(ctx context.Context, key model.RunnerKey) (GetRouteForRunnerRow, error) - GetRoutingTable(ctx context.Context, modules []string) ([]GetRoutingTableRow, error) - GetRunner(ctx context.Context, key model.RunnerKey) (GetRunnerRow, error) - GetRunnerState(ctx context.Context, key model.RunnerKey) (RunnerState, error) - GetRunnersForDeployment(ctx context.Context, key model.DeploymentKey) ([]GetRunnersForDeploymentRow, error) - GetSchemaForDeployment(ctx context.Context, key model.DeploymentKey) (*schema.Module, error) - GetSubscription(ctx context.Context, column1 string, column2 string) (TopicSubscription, error) - // Results may not be ready to be scheduled yet due to event consumption delay - // Sorting ensures that brand new events (that may not be ready for consumption) - // don't prevent older events from being consumed - GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error) - GetTopic(ctx context.Context, dollar_1 int64) (Topic, error) - GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error) GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]GetUnscheduledCronJobsRow, error) - InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error - InsertTimelineCallEvent(ctx context.Context, arg InsertTimelineCallEventParams) error - InsertTimelineDeploymentCreatedEvent(ctx context.Context, arg InsertTimelineDeploymentCreatedEventParams) error - InsertTimelineDeploymentUpdatedEvent(ctx context.Context, arg InsertTimelineDeploymentUpdatedEventParams) error - InsertTimelineEvent(ctx context.Context, arg InsertTimelineEventParams) error - InsertTimelineLogEvent(ctx context.Context, arg InsertTimelineLogEventParams) error IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error) - // Mark any controller entries that haven't been updated recently as dead. - KillStaleControllers(ctx context.Context, timeout sqltypes.Duration) (int64, error) - KillStaleRunners(ctx context.Context, timeout sqltypes.Duration) (int64, error) - LoadAsyncCall(ctx context.Context, id int64) (AsyncCall, error) - NewLease(ctx context.Context, key leases.Key, ttl sqltypes.Duration, metadata pqtype.NullRawMessage) (uuid.UUID, error) - PopNextFSMEvent(ctx context.Context, fsm schema.RefKey, instanceKey string) (FsmNextEvent, error) - PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error - ReleaseLease(ctx context.Context, idempotencyKey uuid.UUID, key leases.Key) (bool, error) - RenewLease(ctx context.Context, ttl sqltypes.Duration, idempotencyKey uuid.UUID, key leases.Key) (bool, error) - // Find an idle runner and reserve it for the given deployment. - ReserveRunner(ctx context.Context, reservationTimeout time.Time, deploymentKey model.DeploymentKey, labels json.RawMessage) (Runner, error) - SetDeploymentDesiredReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int32) error - SetNextFSMEvent(ctx context.Context, arg SetNextFSMEventParams) (int64, error) - SetSubscriptionCursor(ctx context.Context, column1 model.SubscriptionKey, column2 model.TopicEventKey) error - // Start a new FSM transition, populating the destination state and async call ID. - // - // "key" is the unique identifier for the FSM execution. - StartFSMTransition(ctx context.Context, arg StartFSMTransitionParams) (FsmInstance, error) - SucceedAsyncCall(ctx context.Context, response []byte, iD int64) (bool, error) - SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (bool, error) UpdateCronJobExecution(ctx context.Context, lastExecution time.Time, nextExecution time.Time, key model.CronJobKey) error - UpsertController(ctx context.Context, key model.ControllerKey, endpoint string) (int64, error) - UpsertModule(ctx context.Context, language string, name string) (int64, error) - // Upsert a runner and return the deployment ID that it is assigned to, if any. - // If the deployment key is null, then deployment_rel.id will be null, - // otherwise we try to retrieve the deployments.id using the key. If - // there is no corresponding deployment, then the deployment ID is -1 - // and the parent statement will fail due to a foreign key constraint. - UpsertRunner(ctx context.Context, arg UpsertRunnerParams) (optional.Option[int64], error) - UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) (UpsertSubscriptionRow, error) - UpsertTopic(ctx context.Context, arg UpsertTopicParams) error } var _ Querier = (*Queries)(nil) diff --git a/backend/controller/cronjobs/sql/queries.sql b/backend/controller/cronjobs/sql/queries.sql index f17aba697f..ec05c101ce 100644 --- a/backend/controller/cronjobs/sql/queries.sql +++ b/backend/controller/cronjobs/sql/queries.sql @@ -1,5 +1,5 @@ -- name: GetUnscheduledCronJobs :many -SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.last_execution +SELECT sqlc.embed(j), sqlc.embed(d) FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id WHERE d.min_replicas > 0 @@ -16,17 +16,8 @@ WHERE d.min_replicas > 0 ) FOR UPDATE SKIP LOCKED; --- name: IsCronJobPending :one -SELECT EXISTS ( - SELECT 1 - FROM async_calls ac - WHERE ac.cron_job_key = sqlc.arg('key')::cron_job_key - AND ac.scheduled_at > sqlc.arg('start_time')::TIMESTAMPTZ - AND ac.state = 'pending' -) AS pending; - -- name: GetCronJobByKey :one -SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.last_execution +SELECT sqlc.embed(j), sqlc.embed(d) FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id WHERE j.key = sqlc.arg('key')::cron_job_key diff --git a/backend/controller/cronjobs/sql/queries.sql.go b/backend/controller/cronjobs/sql/queries.sql.go index bc48f6fb4a..4c3368d674 100644 --- a/backend/controller/cronjobs/sql/queries.sql.go +++ b/backend/controller/cronjobs/sql/queries.sql.go @@ -7,250 +7,11 @@ package sql import ( "context" - "encoding/json" "time" - "github.com/TBD54566975/ftl/backend/controller/leases" - "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" - "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/model" - "github.com/alecthomas/types/optional" - "github.com/google/uuid" - "github.com/lib/pq" - "github.com/sqlc-dev/pqtype" ) -const acquireAsyncCall = `-- name: AcquireAsyncCall :one -WITH pending_calls AS ( - SELECT id - FROM async_calls - WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc') - ORDER BY created_at -), async_call AS ( - SELECT id - FROM pending_calls - LIMIT 1 - FOR UPDATE SKIP LOCKED -), lease AS ( - INSERT INTO leases (idempotency_key, key, expires_at) - SELECT gen_random_uuid(), '/system/async_call/' || (SELECT id FROM async_call), (NOW() AT TIME ZONE 'utc') + $1::interval - WHERE (SELECT id FROM async_call) IS NOT NULL - RETURNING id, idempotency_key, key, created_at, expires_at, metadata -) -UPDATE async_calls -SET state = 'executing', lease_id = (SELECT id FROM lease) -WHERE id = (SELECT id FROM async_call) -RETURNING - id AS async_call_id, - (SELECT idempotency_key FROM lease) AS lease_idempotency_key, - (SELECT key FROM lease) AS lease_key, - (SELECT count(*) FROM pending_calls) AS queue_depth, - origin, - verb, - catch_verb, - request, - scheduled_at, - remaining_attempts, - error, - backoff, - max_backoff, - parent_request_key, - trace_context, - catching -` - -type AcquireAsyncCallRow struct { - AsyncCallID int64 - LeaseIdempotencyKey uuid.UUID - LeaseKey leases.Key - QueueDepth int64 - Origin string - Verb schema.RefKey - CatchVerb optional.Option[schema.RefKey] - Request []byte - ScheduledAt time.Time - RemainingAttempts int32 - Error optional.Option[string] - Backoff sqltypes.Duration - MaxBackoff sqltypes.Duration - ParentRequestKey optional.Option[string] - TraceContext pqtype.NullRawMessage - Catching bool -} - -// Reserve a pending async call for execution, returning the associated lease -// reservation key and accompanying metadata. -func (q *Queries) AcquireAsyncCall(ctx context.Context, ttl sqltypes.Duration) (AcquireAsyncCallRow, error) { - row := q.db.QueryRowContext(ctx, acquireAsyncCall, ttl) - var i AcquireAsyncCallRow - err := row.Scan( - &i.AsyncCallID, - &i.LeaseIdempotencyKey, - &i.LeaseKey, - &i.QueueDepth, - &i.Origin, - &i.Verb, - &i.CatchVerb, - &i.Request, - &i.ScheduledAt, - &i.RemainingAttempts, - &i.Error, - &i.Backoff, - &i.MaxBackoff, - &i.ParentRequestKey, - &i.TraceContext, - &i.Catching, - ) - return i, err -} - -const associateArtefactWithDeployment = `-- name: AssociateArtefactWithDeployment :exec -INSERT INTO deployment_artefacts (deployment_id, artefact_id, executable, path) -VALUES ((SELECT id FROM deployments WHERE key = $1::deployment_key), $2, $3, $4) -` - -type AssociateArtefactWithDeploymentParams struct { - Key model.DeploymentKey - ArtefactID int64 - Executable bool - Path string -} - -func (q *Queries) AssociateArtefactWithDeployment(ctx context.Context, arg AssociateArtefactWithDeploymentParams) error { - _, err := q.db.ExecContext(ctx, associateArtefactWithDeployment, - arg.Key, - arg.ArtefactID, - arg.Executable, - arg.Path, - ) - return err -} - -const asyncCallQueueDepth = `-- name: AsyncCallQueueDepth :one -SELECT count(*) -FROM async_calls -WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc') -` - -func (q *Queries) AsyncCallQueueDepth(ctx context.Context) (int64, error) { - row := q.db.QueryRowContext(ctx, asyncCallQueueDepth) - var count int64 - err := row.Scan(&count) - return count, err -} - -const beginConsumingTopicEvent = `-- name: BeginConsumingTopicEvent :exec -WITH event AS ( - SELECT id, created_at, key, topic_id, payload, caller, request_key, trace_context - FROM topic_events - WHERE "key" = $2::topic_event_key -) -UPDATE topic_subscriptions -SET state = 'executing', - cursor = (SELECT id FROM event) -WHERE key = $1::subscription_key -` - -func (q *Queries) BeginConsumingTopicEvent(ctx context.Context, subscription model.SubscriptionKey, event model.TopicEventKey) error { - _, err := q.db.ExecContext(ctx, beginConsumingTopicEvent, subscription, event) - return err -} - -const completeEventForSubscription = `-- name: CompleteEventForSubscription :exec -WITH module AS ( - SELECT id - FROM modules - WHERE name = $2::TEXT -) -UPDATE topic_subscriptions -SET state = 'idle' -WHERE name = $1::TEXT - AND module_id = (SELECT id FROM module) -` - -func (q *Queries) CompleteEventForSubscription(ctx context.Context, name string, module string) error { - _, err := q.db.ExecContext(ctx, completeEventForSubscription, name, module) - return err -} - -const createArtefact = `-- name: CreateArtefact :one -INSERT INTO artefacts (digest, content) -VALUES ($1, $2) -ON CONFLICT (digest) DO NOTHING -RETURNING id -` - -// Create a new artefact and return the artefact ID. -func (q *Queries) CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error) { - row := q.db.QueryRowContext(ctx, createArtefact, digest, content) - var id int64 - err := row.Scan(&id) - return id, err -} - -const createAsyncCall = `-- name: CreateAsyncCall :one -INSERT INTO async_calls ( - scheduled_at, - verb, - origin, - request, - remaining_attempts, - backoff, - max_backoff, - catch_verb, - parent_request_key, - trace_context, - cron_job_key -) -VALUES ( - $1::TIMESTAMPTZ, - $2, - $3, - $4, - $5, - $6::interval, - $7::interval, - $8, - $9, - $10::jsonb, - $11 -) -RETURNING id -` - -type CreateAsyncCallParams struct { - ScheduledAt time.Time - Verb schema.RefKey - Origin string - Request []byte - RemainingAttempts int32 - Backoff sqltypes.Duration - MaxBackoff sqltypes.Duration - CatchVerb optional.Option[schema.RefKey] - ParentRequestKey optional.Option[string] - TraceContext json.RawMessage - CronJobKey optional.Option[model.CronJobKey] -} - -func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) { - row := q.db.QueryRowContext(ctx, createAsyncCall, - arg.ScheduledAt, - arg.Verb, - arg.Origin, - arg.Request, - arg.RemainingAttempts, - arg.Backoff, - arg.MaxBackoff, - arg.CatchVerb, - arg.ParentRequestKey, - arg.TraceContext, - arg.CronJobKey, - ) - var id int64 - err := row.Scan(&id) - return id, err -} - const createCronJob = `-- name: CreateCronJob :exec INSERT INTO cron_jobs (key, deployment_id, module_name, verb, schedule, start_time, next_execution) VALUES ( @@ -286,412 +47,86 @@ func (q *Queries) CreateCronJob(ctx context.Context, arg CreateCronJobParams) er return err } -const createDeployment = `-- name: CreateDeployment :exec -INSERT INTO deployments (module_id, "schema", "key") -VALUES ((SELECT id FROM modules WHERE name = $1::TEXT LIMIT 1), $2::BYTEA, $3::deployment_key) -` - -func (q *Queries) CreateDeployment(ctx context.Context, moduleName string, schema []byte, key model.DeploymentKey) error { - _, err := q.db.ExecContext(ctx, createDeployment, moduleName, schema, key) - return err -} - -const createIngressRoute = `-- name: CreateIngressRoute :exec -INSERT INTO ingress_routes (deployment_id, module, verb, method, path) -VALUES ((SELECT id FROM deployments WHERE key = $1::deployment_key LIMIT 1), $2, $3, $4, $5) -` - -type CreateIngressRouteParams struct { - Key model.DeploymentKey - Module string - Verb string - Method string - Path string -} - -func (q *Queries) CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error { - _, err := q.db.ExecContext(ctx, createIngressRoute, - arg.Key, - arg.Module, - arg.Verb, - arg.Method, - arg.Path, - ) - return err -} - -const createOnlyEncryptionKey = `-- name: CreateOnlyEncryptionKey :exec -INSERT INTO encryption_keys (id, key) -VALUES (1, $1) -` - -func (q *Queries) CreateOnlyEncryptionKey(ctx context.Context, key []byte) error { - _, err := q.db.ExecContext(ctx, createOnlyEncryptionKey, key) - return err -} - -const createRequest = `-- name: CreateRequest :exec -INSERT INTO requests (origin, "key", source_addr) -VALUES ($1, $2, $3) -` - -func (q *Queries) CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error { - _, err := q.db.ExecContext(ctx, createRequest, origin, key, sourceAddr) - return err -} - -const deleteOldTimelineEvents = `-- name: DeleteOldTimelineEvents :one -WITH deleted AS ( - DELETE FROM timeline - WHERE time_stamp < (NOW() AT TIME ZONE 'utc') - $1::INTERVAL - AND type = $2 - RETURNING 1 -) -SELECT COUNT(*) -FROM deleted -` - -func (q *Queries) DeleteOldTimelineEvents(ctx context.Context, timeout sqltypes.Duration, type_ EventType) (int64, error) { - row := q.db.QueryRowContext(ctx, deleteOldTimelineEvents, timeout, type_) - var count int64 - err := row.Scan(&count) - return count, err -} - -const deleteSubscribers = `-- name: DeleteSubscribers :many -DELETE FROM topic_subscribers -WHERE deployment_id IN ( - SELECT deployments.id - FROM deployments - WHERE deployments.key = $1::deployment_key -) -RETURNING topic_subscribers.key -` - -func (q *Queries) DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) { - rows, err := q.db.QueryContext(ctx, deleteSubscribers, deployment) - if err != nil { - return nil, err - } - defer rows.Close() - var items []model.SubscriberKey - for rows.Next() { - var key model.SubscriberKey - if err := rows.Scan(&key); err != nil { - return nil, err - } - items = append(items, key) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const deleteSubscriptions = `-- name: DeleteSubscriptions :many -DELETE FROM topic_subscriptions -WHERE deployment_id IN ( - SELECT deployments.id - FROM deployments - WHERE deployments.key = $1::deployment_key -) -RETURNING topic_subscriptions.key -` - -func (q *Queries) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) { - rows, err := q.db.QueryContext(ctx, deleteSubscriptions, deployment) - if err != nil { - return nil, err - } - defer rows.Close() - var items []model.SubscriptionKey - for rows.Next() { - var key model.SubscriptionKey - if err := rows.Scan(&key); err != nil { - return nil, err - } - items = append(items, key) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const deregisterRunner = `-- name: DeregisterRunner :one -WITH matches AS ( - UPDATE runners - SET state = 'dead', - deployment_id = NULL - WHERE key = $1::runner_key - RETURNING 1) -SELECT COUNT(*) -FROM matches -` - -func (q *Queries) DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error) { - row := q.db.QueryRowContext(ctx, deregisterRunner, key) - var count int64 - err := row.Scan(&count) - return count, err -} - -const expireLeases = `-- name: ExpireLeases :one -WITH expired AS ( - DELETE FROM leases - WHERE expires_at < NOW() AT TIME ZONE 'utc' - RETURNING 1 -) -SELECT COUNT(*) -FROM expired -` - -func (q *Queries) ExpireLeases(ctx context.Context) (int64, error) { - row := q.db.QueryRowContext(ctx, expireLeases) - var count int64 - err := row.Scan(&count) - return count, err -} - -const expireRunnerReservations = `-- name: ExpireRunnerReservations :one -WITH rows AS ( - UPDATE runners - SET state = 'idle', - deployment_id = NULL, - reservation_timeout = NULL - WHERE state = 'reserved' - AND reservation_timeout < (NOW() AT TIME ZONE 'utc') - RETURNING 1) -SELECT COUNT(*) -FROM rows -` - -func (q *Queries) ExpireRunnerReservations(ctx context.Context) (int64, error) { - row := q.db.QueryRowContext(ctx, expireRunnerReservations) - var count int64 - err := row.Scan(&count) - return count, err -} - -const failAsyncCall = `-- name: FailAsyncCall :one -UPDATE async_calls -SET - state = 'error'::async_call_state, - error = $1::TEXT -WHERE id = $2 -RETURNING true -` - -func (q *Queries) FailAsyncCall(ctx context.Context, error string, iD int64) (bool, error) { - row := q.db.QueryRowContext(ctx, failAsyncCall, error, iD) - var column_1 bool - err := row.Scan(&column_1) - return column_1, err -} - -const failAsyncCallWithRetry = `-- name: FailAsyncCallWithRetry :one -WITH updated AS ( - UPDATE async_calls - SET state = 'error'::async_call_state, - error = $7::TEXT - WHERE id = $8::BIGINT - RETURNING id, created_at, lease_id, verb, state, origin, scheduled_at, request, response, error, remaining_attempts, backoff, max_backoff, catch_verb, catching, parent_request_key, trace_context, cron_job_key -) -INSERT INTO async_calls ( - verb, - origin, - request, - catch_verb, - remaining_attempts, - backoff, - max_backoff, - scheduled_at, - catching, - error -) -SELECT - updated.verb, - updated.origin, - updated.request, - updated.catch_verb, - $1, - $2::interval, - $3::interval, - $4::TIMESTAMPTZ, - $5::bool, - $6 -FROM updated -RETURNING true +const getCronJobByKey = `-- name: GetCronJobByKey :one +SELECT j.id, j.key, j.deployment_id, j.verb, j.schedule, j.start_time, j.next_execution, j.module_name, j.last_execution, d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas +FROM cron_jobs j + INNER JOIN deployments d on j.deployment_id = d.id +WHERE j.key = $1::cron_job_key +FOR UPDATE SKIP LOCKED ` -type FailAsyncCallWithRetryParams struct { - RemainingAttempts int32 - Backoff sqltypes.Duration - MaxBackoff sqltypes.Duration - ScheduledAt time.Time - Catching bool - OriginalError optional.Option[string] - Error string - ID int64 +type GetCronJobByKeyRow struct { + CronJob CronJob + Deployment Deployment } -func (q *Queries) FailAsyncCallWithRetry(ctx context.Context, arg FailAsyncCallWithRetryParams) (bool, error) { - row := q.db.QueryRowContext(ctx, failAsyncCallWithRetry, - arg.RemainingAttempts, - arg.Backoff, - arg.MaxBackoff, - arg.ScheduledAt, - arg.Catching, - arg.OriginalError, - arg.Error, - arg.ID, +func (q *Queries) GetCronJobByKey(ctx context.Context, key model.CronJobKey) (GetCronJobByKeyRow, error) { + row := q.db.QueryRowContext(ctx, getCronJobByKey, key) + var i GetCronJobByKeyRow + err := row.Scan( + &i.CronJob.ID, + &i.CronJob.Key, + &i.CronJob.DeploymentID, + &i.CronJob.Verb, + &i.CronJob.Schedule, + &i.CronJob.StartTime, + &i.CronJob.NextExecution, + &i.CronJob.ModuleName, + &i.CronJob.LastExecution, + &i.Deployment.ID, + &i.Deployment.CreatedAt, + &i.Deployment.ModuleID, + &i.Deployment.Key, + &i.Deployment.Schema, + &i.Deployment.Labels, + &i.Deployment.MinReplicas, ) - var column_1 bool - err := row.Scan(&column_1) - return column_1, err -} - -const failFSMInstance = `-- name: FailFSMInstance :one -UPDATE fsm_instances -SET - current_state = NULL, - async_call_id = NULL, - status = 'failed'::fsm_status, - updated_at = NOW() AT TIME ZONE 'utc' -WHERE - fsm = $1::schema_ref AND key = $2::TEXT -RETURNING true -` - -func (q *Queries) FailFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (bool, error) { - row := q.db.QueryRowContext(ctx, failFSMInstance, fsm, key) - var column_1 bool - err := row.Scan(&column_1) - return column_1, err -} - -const finishFSMTransition = `-- name: FinishFSMTransition :one -UPDATE fsm_instances -SET - current_state = destination_state, - destination_state = NULL, - async_call_id = NULL, - updated_at = NOW() AT TIME ZONE 'utc' -WHERE - fsm = $1::schema_ref AND key = $2::TEXT -RETURNING true -` - -// Mark an FSM transition as completed, updating the current state and clearing the async call ID. -func (q *Queries) FinishFSMTransition(ctx context.Context, fsm schema.RefKey, key string) (bool, error) { - row := q.db.QueryRowContext(ctx, finishFSMTransition, fsm, key) - var column_1 bool - err := row.Scan(&column_1) - return column_1, err -} - -const getActiveControllers = `-- name: GetActiveControllers :many -SELECT id, key, created, last_seen, state, endpoint -FROM controller c -WHERE c.state <> 'dead' -ORDER BY c.key -` - -func (q *Queries) GetActiveControllers(ctx context.Context) ([]Controller, error) { - rows, err := q.db.QueryContext(ctx, getActiveControllers) - if err != nil { - return nil, err - } - defer rows.Close() - var items []Controller - for rows.Next() { - var i Controller - if err := rows.Scan( - &i.ID, - &i.Key, - &i.Created, - &i.LastSeen, - &i.State, - &i.Endpoint, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getActiveDeploymentSchemas = `-- name: GetActiveDeploymentSchemas :many -SELECT key, schema FROM deployments WHERE min_replicas > 0 -` - -type GetActiveDeploymentSchemasRow struct { - Key model.DeploymentKey - Schema *schema.Module -} - -func (q *Queries) GetActiveDeploymentSchemas(ctx context.Context) ([]GetActiveDeploymentSchemasRow, error) { - rows, err := q.db.QueryContext(ctx, getActiveDeploymentSchemas) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetActiveDeploymentSchemasRow - for rows.Next() { - var i GetActiveDeploymentSchemasRow - if err := rows.Scan(&i.Key, &i.Schema); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil + return i, err } -const getActiveDeployments = `-- name: GetActiveDeployments :many -SELECT d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas, m.name AS module_name, m.language, COUNT(r.id) AS replicas -FROM deployments d - JOIN modules m ON d.module_id = m.id - JOIN runners r ON d.id = r.deployment_id -WHERE min_replicas > 0 AND r.state = 'assigned' -GROUP BY d.id, m.name, m.language -HAVING COUNT(r.id) > 0 +const getUnscheduledCronJobs = `-- name: GetUnscheduledCronJobs :many +SELECT j.id, j.key, j.deployment_id, j.verb, j.schedule, j.start_time, j.next_execution, j.module_name, j.last_execution, d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas +FROM cron_jobs j + INNER JOIN deployments d on j.deployment_id = d.id +WHERE d.min_replicas > 0 + AND j.start_time < $1::TIMESTAMPTZ + AND ( + j.last_execution IS NULL + OR NOT EXISTS ( + SELECT 1 + FROM async_calls ac + WHERE + ac.cron_job_key = j.key + AND ac.scheduled_at > j.last_execution::TIMESTAMPTZ + ) + ) +FOR UPDATE SKIP LOCKED ` -type GetActiveDeploymentsRow struct { +type GetUnscheduledCronJobsRow struct { + CronJob CronJob Deployment Deployment - ModuleName string - Language string - Replicas int64 } -func (q *Queries) GetActiveDeployments(ctx context.Context) ([]GetActiveDeploymentsRow, error) { - rows, err := q.db.QueryContext(ctx, getActiveDeployments) +func (q *Queries) GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]GetUnscheduledCronJobsRow, error) { + rows, err := q.db.QueryContext(ctx, getUnscheduledCronJobs, startTime) if err != nil { return nil, err } defer rows.Close() - var items []GetActiveDeploymentsRow + var items []GetUnscheduledCronJobsRow for rows.Next() { - var i GetActiveDeploymentsRow + var i GetUnscheduledCronJobsRow if err := rows.Scan( + &i.CronJob.ID, + &i.CronJob.Key, + &i.CronJob.DeploymentID, + &i.CronJob.Verb, + &i.CronJob.Schedule, + &i.CronJob.StartTime, + &i.CronJob.NextExecution, + &i.CronJob.ModuleName, + &i.CronJob.LastExecution, &i.Deployment.ID, &i.Deployment.CreatedAt, &i.Deployment.ModuleID, @@ -699,9 +134,6 @@ func (q *Queries) GetActiveDeployments(ctx context.Context) ([]GetActiveDeployme &i.Deployment.Schema, &i.Deployment.Labels, &i.Deployment.MinReplicas, - &i.ModuleName, - &i.Language, - &i.Replicas, ); err != nil { return nil, err } @@ -716,2043 +148,14 @@ func (q *Queries) GetActiveDeployments(ctx context.Context) ([]GetActiveDeployme return items, nil } -const getActiveIngressRoutes = `-- name: GetActiveIngressRoutes :many -SELECT d.key AS deployment_key, ir.module, ir.verb, ir.method, ir.path -FROM ingress_routes ir - INNER JOIN deployments d ON ir.deployment_id = d.id -WHERE d.min_replicas > 0 -` - -type GetActiveIngressRoutesRow struct { - DeploymentKey model.DeploymentKey - Module string - Verb string - Method string - Path string -} - -func (q *Queries) GetActiveIngressRoutes(ctx context.Context) ([]GetActiveIngressRoutesRow, error) { - rows, err := q.db.QueryContext(ctx, getActiveIngressRoutes) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetActiveIngressRoutesRow - for rows.Next() { - var i GetActiveIngressRoutesRow - if err := rows.Scan( - &i.DeploymentKey, - &i.Module, - &i.Verb, - &i.Method, - &i.Path, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getActiveRunners = `-- name: GetActiveRunners :many -SELECT DISTINCT ON (r.key) r.key AS runner_key, - r.endpoint, - r.state, - r.labels, - r.last_seen, - r.module_name, - COALESCE(CASE - WHEN r.deployment_id IS NOT NULL - THEN d.key END, NULL) AS deployment_key -FROM runners r - LEFT JOIN deployments d on d.id = r.deployment_id -WHERE r.state <> 'dead' -ORDER BY r.key -` - -type GetActiveRunnersRow struct { - RunnerKey model.RunnerKey - Endpoint string - State RunnerState - Labels json.RawMessage - LastSeen time.Time - ModuleName optional.Option[string] - DeploymentKey optional.Option[string] -} - -func (q *Queries) GetActiveRunners(ctx context.Context) ([]GetActiveRunnersRow, error) { - rows, err := q.db.QueryContext(ctx, getActiveRunners) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetActiveRunnersRow - for rows.Next() { - var i GetActiveRunnersRow - if err := rows.Scan( - &i.RunnerKey, - &i.Endpoint, - &i.State, - &i.Labels, - &i.LastSeen, - &i.ModuleName, - &i.DeploymentKey, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getArtefactContentRange = `-- name: GetArtefactContentRange :one -SELECT SUBSTRING(a.content FROM $1 FOR $2)::BYTEA AS content -FROM artefacts a -WHERE a.id = $3 -` - -func (q *Queries) GetArtefactContentRange(ctx context.Context, start int32, count int32, iD int64) ([]byte, error) { - row := q.db.QueryRowContext(ctx, getArtefactContentRange, start, count, iD) - var content []byte - err := row.Scan(&content) - return content, err -} - -const getArtefactDigests = `-- name: GetArtefactDigests :many -SELECT id, digest -FROM artefacts -WHERE digest = ANY ($1::bytea[]) -` - -type GetArtefactDigestsRow struct { - ID int64 - Digest []byte -} - -// Return the digests that exist in the database. -func (q *Queries) GetArtefactDigests(ctx context.Context, digests [][]byte) ([]GetArtefactDigestsRow, error) { - rows, err := q.db.QueryContext(ctx, getArtefactDigests, pq.Array(digests)) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetArtefactDigestsRow - for rows.Next() { - var i GetArtefactDigestsRow - if err := rows.Scan(&i.ID, &i.Digest); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getCronJobByKey = `-- name: GetCronJobByKey :one -SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.last_execution -FROM cron_jobs j - INNER JOIN deployments d on j.deployment_id = d.id -WHERE j.key = $1::cron_job_key -FOR UPDATE SKIP LOCKED -` - -type GetCronJobByKeyRow struct { - Key model.CronJobKey - DeploymentKey model.DeploymentKey - Module string - Verb string - Schedule string - StartTime time.Time - NextExecution time.Time - LastExecution optional.Option[time.Time] -} - -func (q *Queries) GetCronJobByKey(ctx context.Context, key model.CronJobKey) (GetCronJobByKeyRow, error) { - row := q.db.QueryRowContext(ctx, getCronJobByKey, key) - var i GetCronJobByKeyRow - err := row.Scan( - &i.Key, - &i.DeploymentKey, - &i.Module, - &i.Verb, - &i.Schedule, - &i.StartTime, - &i.NextExecution, - &i.LastExecution, - ) - return i, err -} - -const getDeployment = `-- name: GetDeployment :one -SELECT d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas, m.language, m.name AS module_name, d.min_replicas -FROM deployments d - INNER JOIN modules m ON m.id = d.module_id -WHERE d.key = $1::deployment_key -` - -type GetDeploymentRow struct { - Deployment Deployment - Language string - ModuleName string - MinReplicas int32 -} - -func (q *Queries) GetDeployment(ctx context.Context, key model.DeploymentKey) (GetDeploymentRow, error) { - row := q.db.QueryRowContext(ctx, getDeployment, key) - var i GetDeploymentRow - err := row.Scan( - &i.Deployment.ID, - &i.Deployment.CreatedAt, - &i.Deployment.ModuleID, - &i.Deployment.Key, - &i.Deployment.Schema, - &i.Deployment.Labels, - &i.Deployment.MinReplicas, - &i.Language, - &i.ModuleName, - &i.MinReplicas, - ) - return i, err -} - -const getDeploymentArtefacts = `-- name: GetDeploymentArtefacts :many -SELECT da.created_at, artefact_id AS id, executable, path, digest, executable -FROM deployment_artefacts da - INNER JOIN artefacts ON artefacts.id = da.artefact_id -WHERE deployment_id = $1 -` - -type GetDeploymentArtefactsRow struct { - CreatedAt time.Time - ID int64 - Executable bool - Path string - Digest []byte - Executable_2 bool -} - -// Get all artefacts matching the given digests. -func (q *Queries) GetDeploymentArtefacts(ctx context.Context, deploymentID int64) ([]GetDeploymentArtefactsRow, error) { - rows, err := q.db.QueryContext(ctx, getDeploymentArtefacts, deploymentID) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetDeploymentArtefactsRow - for rows.Next() { - var i GetDeploymentArtefactsRow - if err := rows.Scan( - &i.CreatedAt, - &i.ID, - &i.Executable, - &i.Path, - &i.Digest, - &i.Executable_2, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getDeploymentsByID = `-- name: GetDeploymentsByID :many -SELECT id, created_at, module_id, key, schema, labels, min_replicas -FROM deployments -WHERE id = ANY ($1::BIGINT[]) -` - -func (q *Queries) GetDeploymentsByID(ctx context.Context, ids []int64) ([]Deployment, error) { - rows, err := q.db.QueryContext(ctx, getDeploymentsByID, pq.Array(ids)) - if err != nil { - return nil, err - } - defer rows.Close() - var items []Deployment - for rows.Next() { - var i Deployment - if err := rows.Scan( - &i.ID, - &i.CreatedAt, - &i.ModuleID, - &i.Key, - &i.Schema, - &i.Labels, - &i.MinReplicas, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getDeploymentsNeedingReconciliation = `-- name: GetDeploymentsNeedingReconciliation :many -SELECT d.key AS deployment_key, - m.name AS module_name, - m.language AS language, - COUNT(r.id) AS assigned_runners_count, - d.min_replicas::BIGINT AS required_runners_count -FROM deployments d - LEFT JOIN runners r ON d.id = r.deployment_id AND r.state <> 'dead' - JOIN modules m ON d.module_id = m.id -GROUP BY d.key, d.min_replicas, m.name, m.language -HAVING COUNT(r.id) <> d.min_replicas -` - -type GetDeploymentsNeedingReconciliationRow struct { - DeploymentKey model.DeploymentKey - ModuleName string - Language string - AssignedRunnersCount int64 - RequiredRunnersCount int64 -} - -// Get deployments that have a mismatch between the number of assigned and required replicas. -func (q *Queries) GetDeploymentsNeedingReconciliation(ctx context.Context) ([]GetDeploymentsNeedingReconciliationRow, error) { - rows, err := q.db.QueryContext(ctx, getDeploymentsNeedingReconciliation) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetDeploymentsNeedingReconciliationRow - for rows.Next() { - var i GetDeploymentsNeedingReconciliationRow - if err := rows.Scan( - &i.DeploymentKey, - &i.ModuleName, - &i.Language, - &i.AssignedRunnersCount, - &i.RequiredRunnersCount, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getDeploymentsWithArtefacts = `-- name: GetDeploymentsWithArtefacts :many -SELECT d.id, d.created_at, d.key as deployment_key, d.schema, m.name AS module_name -FROM deployments d - INNER JOIN modules m ON d.module_id = m.id -WHERE EXISTS (SELECT 1 - FROM deployment_artefacts da - INNER JOIN artefacts a ON da.artefact_id = a.id - WHERE a.digest = ANY ($1::bytea[]) - AND da.deployment_id = d.id - AND d.schema = $2::BYTEA - HAVING COUNT(*) = $3::BIGINT -- Number of unique digests provided -) -` - -type GetDeploymentsWithArtefactsRow struct { - ID int64 - CreatedAt time.Time - DeploymentKey model.DeploymentKey - Schema *schema.Module - ModuleName string -} - -// Get all deployments that have artefacts matching the given digests. -func (q *Queries) GetDeploymentsWithArtefacts(ctx context.Context, digests [][]byte, schema []byte, count int64) ([]GetDeploymentsWithArtefactsRow, error) { - rows, err := q.db.QueryContext(ctx, getDeploymentsWithArtefacts, pq.Array(digests), schema, count) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetDeploymentsWithArtefactsRow - for rows.Next() { - var i GetDeploymentsWithArtefactsRow - if err := rows.Scan( - &i.ID, - &i.CreatedAt, - &i.DeploymentKey, - &i.Schema, - &i.ModuleName, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getDeploymentsWithMinReplicas = `-- name: GetDeploymentsWithMinReplicas :many -SELECT d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas, m.name AS module_name, m.language -FROM deployments d - INNER JOIN modules m on d.module_id = m.id -WHERE min_replicas > 0 -ORDER BY d.key -` - -type GetDeploymentsWithMinReplicasRow struct { - Deployment Deployment - ModuleName string - Language string -} - -func (q *Queries) GetDeploymentsWithMinReplicas(ctx context.Context) ([]GetDeploymentsWithMinReplicasRow, error) { - rows, err := q.db.QueryContext(ctx, getDeploymentsWithMinReplicas) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetDeploymentsWithMinReplicasRow - for rows.Next() { - var i GetDeploymentsWithMinReplicasRow - if err := rows.Scan( - &i.Deployment.ID, - &i.Deployment.CreatedAt, - &i.Deployment.ModuleID, - &i.Deployment.Key, - &i.Deployment.Schema, - &i.Deployment.Labels, - &i.Deployment.MinReplicas, - &i.ModuleName, - &i.Language, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getExistingDeploymentForModule = `-- name: GetExistingDeploymentForModule :one -SELECT d.id, created_at, module_id, key, schema, labels, min_replicas, m.id, language, name -FROM deployments d - INNER JOIN modules m on d.module_id = m.id -WHERE m.name = $1 - AND min_replicas > 0 -LIMIT 1 -` - -type GetExistingDeploymentForModuleRow struct { - ID int64 - CreatedAt time.Time - ModuleID int64 - Key model.DeploymentKey - Schema *schema.Module - Labels json.RawMessage - MinReplicas int32 - ID_2 int64 - Language string - Name string -} - -func (q *Queries) GetExistingDeploymentForModule(ctx context.Context, name string) (GetExistingDeploymentForModuleRow, error) { - row := q.db.QueryRowContext(ctx, getExistingDeploymentForModule, name) - var i GetExistingDeploymentForModuleRow - err := row.Scan( - &i.ID, - &i.CreatedAt, - &i.ModuleID, - &i.Key, - &i.Schema, - &i.Labels, - &i.MinReplicas, - &i.ID_2, - &i.Language, - &i.Name, - ) - return i, err -} - -const getFSMInstance = `-- name: GetFSMInstance :one -SELECT id, created_at, fsm, key, status, current_state, destination_state, async_call_id, updated_at -FROM fsm_instances -WHERE fsm = $1::schema_ref AND key = $2 -` - -func (q *Queries) GetFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (FsmInstance, error) { - row := q.db.QueryRowContext(ctx, getFSMInstance, fsm, key) - var i FsmInstance - err := row.Scan( - &i.ID, - &i.CreatedAt, - &i.Fsm, - &i.Key, - &i.Status, - &i.CurrentState, - &i.DestinationState, - &i.AsyncCallID, - &i.UpdatedAt, - ) - return i, err -} - -const getIdleRunners = `-- name: GetIdleRunners :many -SELECT id, key, created, last_seen, reservation_timeout, state, endpoint, module_name, deployment_id, labels -FROM runners -WHERE labels @> $1::jsonb - AND state = 'idle' -LIMIT $2 -` - -func (q *Queries) GetIdleRunners(ctx context.Context, labels json.RawMessage, limit int64) ([]Runner, error) { - rows, err := q.db.QueryContext(ctx, getIdleRunners, labels, limit) - if err != nil { - return nil, err - } - defer rows.Close() - var items []Runner - for rows.Next() { - var i Runner - if err := rows.Scan( - &i.ID, - &i.Key, - &i.Created, - &i.LastSeen, - &i.ReservationTimeout, - &i.State, - &i.Endpoint, - &i.ModuleName, - &i.DeploymentID, - &i.Labels, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getIngressRoutes = `-- name: GetIngressRoutes :many -SELECT r.key AS runner_key, d.key AS deployment_key, endpoint, ir.path, ir.module, ir.verb -FROM ingress_routes ir - INNER JOIN runners r ON ir.deployment_id = r.deployment_id - INNER JOIN deployments d ON ir.deployment_id = d.id -WHERE r.state = 'assigned' - AND ir.method = $1 -` - -type GetIngressRoutesRow struct { - RunnerKey model.RunnerKey - DeploymentKey model.DeploymentKey - Endpoint string - Path string - Module string - Verb string -} - -// Get the runner endpoints corresponding to the given ingress route. -func (q *Queries) GetIngressRoutes(ctx context.Context, method string) ([]GetIngressRoutesRow, error) { - rows, err := q.db.QueryContext(ctx, getIngressRoutes, method) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetIngressRoutesRow - for rows.Next() { - var i GetIngressRoutesRow - if err := rows.Scan( - &i.RunnerKey, - &i.DeploymentKey, - &i.Endpoint, - &i.Path, - &i.Module, - &i.Verb, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getLeaseInfo = `-- name: GetLeaseInfo :one -SELECT expires_at, metadata FROM leases WHERE key = $1::lease_key -` - -type GetLeaseInfoRow struct { - ExpiresAt time.Time - Metadata pqtype.NullRawMessage -} - -func (q *Queries) GetLeaseInfo(ctx context.Context, key leases.Key) (GetLeaseInfoRow, error) { - row := q.db.QueryRowContext(ctx, getLeaseInfo, key) - var i GetLeaseInfoRow - err := row.Scan(&i.ExpiresAt, &i.Metadata) - return i, err -} - -const getModulesByID = `-- name: GetModulesByID :many -SELECT id, language, name -FROM modules -WHERE id = ANY ($1::BIGINT[]) -` - -func (q *Queries) GetModulesByID(ctx context.Context, ids []int64) ([]Module, error) { - rows, err := q.db.QueryContext(ctx, getModulesByID, pq.Array(ids)) - if err != nil { - return nil, err - } - defer rows.Close() - var items []Module - for rows.Next() { - var i Module - if err := rows.Scan(&i.ID, &i.Language, &i.Name); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getNextEventForSubscription = `-- name: GetNextEventForSubscription :one -WITH cursor AS ( - SELECT - created_at, - id - FROM topic_events - WHERE "key" = $3::topic_event_key -) -SELECT events."key" as event, - events.payload, - events.created_at, - events.caller, - events.request_key, - events.trace_context, - NOW() - events.created_at >= $1::interval AS ready -FROM topics -LEFT JOIN topic_events as events ON events.topic_id = topics.id -WHERE topics.key = $2::topic_key - AND (events.created_at, events.id) > (SELECT COALESCE(MAX(cursor.created_at), '1900-01-01'), COALESCE(MAX(cursor.id), 0) FROM cursor) -ORDER BY events.created_at, events.id -LIMIT 1 -` - -type GetNextEventForSubscriptionRow struct { - Event optional.Option[model.TopicEventKey] - Payload []byte - CreatedAt optional.Option[time.Time] - Caller optional.Option[string] - RequestKey optional.Option[string] - TraceContext pqtype.NullRawMessage - Ready bool -} - -func (q *Queries) GetNextEventForSubscription(ctx context.Context, consumptionDelay sqltypes.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error) { - row := q.db.QueryRowContext(ctx, getNextEventForSubscription, consumptionDelay, topic, cursor) - var i GetNextEventForSubscriptionRow - err := row.Scan( - &i.Event, - &i.Payload, - &i.CreatedAt, - &i.Caller, - &i.RequestKey, - &i.TraceContext, - &i.Ready, - ) - return i, err -} - -const getOnlyEncryptionKey = `-- name: GetOnlyEncryptionKey :one -SELECT key -FROM encryption_keys -WHERE id = 1 -` - -func (q *Queries) GetOnlyEncryptionKey(ctx context.Context) ([]byte, error) { - row := q.db.QueryRowContext(ctx, getOnlyEncryptionKey) - var key []byte - err := row.Scan(&key) - return key, err -} - -const getProcessList = `-- name: GetProcessList :many -SELECT d.min_replicas, - d.key AS deployment_key, - d.labels deployment_labels, - r.key AS runner_key, - r.endpoint, - r.labels AS runner_labels -FROM deployments d - LEFT JOIN runners r on d.id = r.deployment_id AND r.state != 'dead' -WHERE d.min_replicas > 0 -ORDER BY d.key -` - -type GetProcessListRow struct { - MinReplicas int32 - DeploymentKey model.DeploymentKey - DeploymentLabels json.RawMessage - RunnerKey optional.Option[model.RunnerKey] - Endpoint optional.Option[string] - RunnerLabels pqtype.NullRawMessage -} - -func (q *Queries) GetProcessList(ctx context.Context) ([]GetProcessListRow, error) { - rows, err := q.db.QueryContext(ctx, getProcessList) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetProcessListRow - for rows.Next() { - var i GetProcessListRow - if err := rows.Scan( - &i.MinReplicas, - &i.DeploymentKey, - &i.DeploymentLabels, - &i.RunnerKey, - &i.Endpoint, - &i.RunnerLabels, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getRandomSubscriber = `-- name: GetRandomSubscriber :one -SELECT - subscribers.sink as sink, - subscribers.retry_attempts as retry_attempts, - subscribers.backoff as backoff, - subscribers.max_backoff as max_backoff, - subscribers.catch_verb as catch_verb -FROM topic_subscribers as subscribers -JOIN topic_subscriptions ON subscribers.topic_subscriptions_id = topic_subscriptions.id -WHERE topic_subscriptions.key = $1::subscription_key -ORDER BY RANDOM() -LIMIT 1 -` - -type GetRandomSubscriberRow struct { - Sink schema.RefKey - RetryAttempts int32 - Backoff sqltypes.Duration - MaxBackoff sqltypes.Duration - CatchVerb optional.Option[schema.RefKey] -} - -func (q *Queries) GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error) { - row := q.db.QueryRowContext(ctx, getRandomSubscriber, key) - var i GetRandomSubscriberRow - err := row.Scan( - &i.Sink, - &i.RetryAttempts, - &i.Backoff, - &i.MaxBackoff, - &i.CatchVerb, - ) - return i, err -} - -const getRouteForRunner = `-- name: GetRouteForRunner :one -SELECT endpoint, r.key AS runner_key, r.module_name, d.key deployment_key, r.state -FROM runners r - LEFT JOIN deployments d on r.deployment_id = d.id -WHERE r.key = $1::runner_key -` - -type GetRouteForRunnerRow struct { - Endpoint string - RunnerKey model.RunnerKey - ModuleName optional.Option[string] - DeploymentKey optional.Option[model.DeploymentKey] - State RunnerState -} - -// Retrieve routing information for a runner. -func (q *Queries) GetRouteForRunner(ctx context.Context, key model.RunnerKey) (GetRouteForRunnerRow, error) { - row := q.db.QueryRowContext(ctx, getRouteForRunner, key) - var i GetRouteForRunnerRow - err := row.Scan( - &i.Endpoint, - &i.RunnerKey, - &i.ModuleName, - &i.DeploymentKey, - &i.State, - ) - return i, err -} - -const getRoutingTable = `-- name: GetRoutingTable :many -SELECT endpoint, r.key AS runner_key, r.module_name, d.key deployment_key -FROM runners r - LEFT JOIN deployments d on r.deployment_id = d.id -WHERE state = 'assigned' - AND (COALESCE(cardinality($1::TEXT[]), 0) = 0 - OR module_name = ANY ($1::TEXT[])) -` - -type GetRoutingTableRow struct { - Endpoint string - RunnerKey model.RunnerKey - ModuleName optional.Option[string] - DeploymentKey optional.Option[model.DeploymentKey] -} - -func (q *Queries) GetRoutingTable(ctx context.Context, modules []string) ([]GetRoutingTableRow, error) { - rows, err := q.db.QueryContext(ctx, getRoutingTable, pq.Array(modules)) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetRoutingTableRow - for rows.Next() { - var i GetRoutingTableRow - if err := rows.Scan( - &i.Endpoint, - &i.RunnerKey, - &i.ModuleName, - &i.DeploymentKey, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getRunner = `-- name: GetRunner :one -SELECT DISTINCT ON (r.key) r.key AS runner_key, - r.endpoint, - r.state, - r.labels, - r.last_seen, - r.module_name, - COALESCE(CASE - WHEN r.deployment_id IS NOT NULL - THEN d.key END, NULL) AS deployment_key -FROM runners r - LEFT JOIN deployments d on d.id = r.deployment_id OR r.deployment_id IS NULL -WHERE r.key = $1::runner_key -` - -type GetRunnerRow struct { - RunnerKey model.RunnerKey - Endpoint string - State RunnerState - Labels json.RawMessage - LastSeen time.Time - ModuleName optional.Option[string] - DeploymentKey optional.Option[string] -} - -func (q *Queries) GetRunner(ctx context.Context, key model.RunnerKey) (GetRunnerRow, error) { - row := q.db.QueryRowContext(ctx, getRunner, key) - var i GetRunnerRow - err := row.Scan( - &i.RunnerKey, - &i.Endpoint, - &i.State, - &i.Labels, - &i.LastSeen, - &i.ModuleName, - &i.DeploymentKey, - ) - return i, err -} - -const getRunnerState = `-- name: GetRunnerState :one -SELECT state -FROM runners -WHERE key = $1::runner_key -` - -func (q *Queries) GetRunnerState(ctx context.Context, key model.RunnerKey) (RunnerState, error) { - row := q.db.QueryRowContext(ctx, getRunnerState, key) - var state RunnerState - err := row.Scan(&state) - return state, err -} - -const getRunnersForDeployment = `-- name: GetRunnersForDeployment :many -SELECT r.id, r.key, created, last_seen, reservation_timeout, state, endpoint, module_name, deployment_id, r.labels, d.id, created_at, module_id, d.key, schema, d.labels, min_replicas -FROM runners r - INNER JOIN deployments d on r.deployment_id = d.id -WHERE state = 'assigned' - AND d.key = $1::deployment_key -` - -type GetRunnersForDeploymentRow struct { - ID int64 - Key model.RunnerKey - Created time.Time - LastSeen time.Time - ReservationTimeout optional.Option[time.Time] - State RunnerState - Endpoint string - ModuleName optional.Option[string] - DeploymentID optional.Option[int64] - Labels json.RawMessage - ID_2 int64 - CreatedAt time.Time - ModuleID int64 - Key_2 model.DeploymentKey - Schema *schema.Module - Labels_2 json.RawMessage - MinReplicas int32 -} - -func (q *Queries) GetRunnersForDeployment(ctx context.Context, key model.DeploymentKey) ([]GetRunnersForDeploymentRow, error) { - rows, err := q.db.QueryContext(ctx, getRunnersForDeployment, key) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetRunnersForDeploymentRow - for rows.Next() { - var i GetRunnersForDeploymentRow - if err := rows.Scan( - &i.ID, - &i.Key, - &i.Created, - &i.LastSeen, - &i.ReservationTimeout, - &i.State, - &i.Endpoint, - &i.ModuleName, - &i.DeploymentID, - &i.Labels, - &i.ID_2, - &i.CreatedAt, - &i.ModuleID, - &i.Key_2, - &i.Schema, - &i.Labels_2, - &i.MinReplicas, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getSchemaForDeployment = `-- name: GetSchemaForDeployment :one -SELECT schema FROM deployments WHERE key = $1::deployment_key -` - -func (q *Queries) GetSchemaForDeployment(ctx context.Context, key model.DeploymentKey) (*schema.Module, error) { - row := q.db.QueryRowContext(ctx, getSchemaForDeployment, key) - var schema *schema.Module - err := row.Scan(&schema) - return schema, err -} - -const getSubscription = `-- name: GetSubscription :one -WITH module AS ( - SELECT id - FROM modules - WHERE name = $2::TEXT -) -SELECT id, key, created_at, topic_id, module_id, deployment_id, name, cursor, state -FROM topic_subscriptions -WHERE name = $1::TEXT - AND module_id = (SELECT id FROM module) -` - -func (q *Queries) GetSubscription(ctx context.Context, column1 string, column2 string) (TopicSubscription, error) { - row := q.db.QueryRowContext(ctx, getSubscription, column1, column2) - var i TopicSubscription - err := row.Scan( - &i.ID, - &i.Key, - &i.CreatedAt, - &i.TopicID, - &i.ModuleID, - &i.DeploymentID, - &i.Name, - &i.Cursor, - &i.State, - ) - return i, err -} - -const getSubscriptionsNeedingUpdate = `-- name: GetSubscriptionsNeedingUpdate :many -SELECT - subs.key::subscription_key as key, - curser.key as cursor, - topics.key::topic_key as topic, - subs.name -FROM topic_subscriptions subs -LEFT JOIN topics ON subs.topic_id = topics.id -LEFT JOIN topic_events curser ON subs.cursor = curser.id -WHERE subs.cursor IS DISTINCT FROM topics.head - AND subs.state = 'idle' -ORDER BY curser.created_at -LIMIT 3 -FOR UPDATE OF subs SKIP LOCKED -` - -type GetSubscriptionsNeedingUpdateRow struct { - Key model.SubscriptionKey - Cursor optional.Option[model.TopicEventKey] - Topic model.TopicKey - Name string -} - -// Results may not be ready to be scheduled yet due to event consumption delay -// Sorting ensures that brand new events (that may not be ready for consumption) -// don't prevent older events from being consumed -func (q *Queries) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error) { - rows, err := q.db.QueryContext(ctx, getSubscriptionsNeedingUpdate) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetSubscriptionsNeedingUpdateRow - for rows.Next() { - var i GetSubscriptionsNeedingUpdateRow - if err := rows.Scan( - &i.Key, - &i.Cursor, - &i.Topic, - &i.Name, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getTopic = `-- name: GetTopic :one -SELECT id, key, created_at, module_id, name, type, head -FROM topics -WHERE id = $1::BIGINT -` - -func (q *Queries) GetTopic(ctx context.Context, dollar_1 int64) (Topic, error) { - row := q.db.QueryRowContext(ctx, getTopic, dollar_1) - var i Topic - err := row.Scan( - &i.ID, - &i.Key, - &i.CreatedAt, - &i.ModuleID, - &i.Name, - &i.Type, - &i.Head, - ) - return i, err -} - -const getTopicEvent = `-- name: GetTopicEvent :one -SELECT id, created_at, key, topic_id, payload, caller, request_key, trace_context -FROM topic_events -WHERE id = $1::BIGINT -` - -func (q *Queries) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error) { - row := q.db.QueryRowContext(ctx, getTopicEvent, dollar_1) - var i TopicEvent - err := row.Scan( - &i.ID, - &i.CreatedAt, - &i.Key, - &i.TopicID, - &i.Payload, - &i.Caller, - &i.RequestKey, - &i.TraceContext, - ) - return i, err -} - -const getUnscheduledCronJobs = `-- name: GetUnscheduledCronJobs :many -SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.last_execution -FROM cron_jobs j - INNER JOIN deployments d on j.deployment_id = d.id -WHERE d.min_replicas > 0 - AND j.start_time < $1::TIMESTAMPTZ - AND ( - j.last_execution IS NULL - OR NOT EXISTS ( - SELECT 1 - FROM async_calls ac - WHERE - ac.cron_job_key = j.key - AND ac.scheduled_at > j.last_execution::TIMESTAMPTZ - ) - ) -FOR UPDATE SKIP LOCKED -` - -type GetUnscheduledCronJobsRow struct { - Key model.CronJobKey - DeploymentKey model.DeploymentKey - Module string - Verb string - Schedule string - StartTime time.Time - NextExecution time.Time - LastExecution optional.Option[time.Time] -} - -func (q *Queries) GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]GetUnscheduledCronJobsRow, error) { - rows, err := q.db.QueryContext(ctx, getUnscheduledCronJobs, startTime) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetUnscheduledCronJobsRow - for rows.Next() { - var i GetUnscheduledCronJobsRow - if err := rows.Scan( - &i.Key, - &i.DeploymentKey, - &i.Module, - &i.Verb, - &i.Schedule, - &i.StartTime, - &i.NextExecution, - &i.LastExecution, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const insertSubscriber = `-- name: InsertSubscriber :exec -INSERT INTO topic_subscribers ( - key, - topic_subscriptions_id, - deployment_id, - sink, - retry_attempts, - backoff, - max_backoff, - catch_verb -) -VALUES ( - $1::subscriber_key, - ( - SELECT topic_subscriptions.id as id - FROM topic_subscriptions - INNER JOIN modules ON topic_subscriptions.module_id = modules.id - WHERE modules.name = $2::TEXT - AND topic_subscriptions.name = $3::TEXT - ), - (SELECT id FROM deployments WHERE key = $4::deployment_key), - $5, - $6, - $7::interval, - $8::interval, - $9 -) -` - -type InsertSubscriberParams struct { - Key model.SubscriberKey - Module string - SubscriptionName string - Deployment model.DeploymentKey - Sink schema.RefKey - RetryAttempts int32 - Backoff sqltypes.Duration - MaxBackoff sqltypes.Duration - CatchVerb optional.Option[schema.RefKey] -} - -func (q *Queries) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error { - _, err := q.db.ExecContext(ctx, insertSubscriber, - arg.Key, - arg.Module, - arg.SubscriptionName, - arg.Deployment, - arg.Sink, - arg.RetryAttempts, - arg.Backoff, - arg.MaxBackoff, - arg.CatchVerb, - ) - return err -} - -const insertTimelineCallEvent = `-- name: InsertTimelineCallEvent :exec -INSERT INTO timeline ( - deployment_id, - request_id, - parent_request_id, - time_stamp, - type, - custom_key_1, - custom_key_2, - custom_key_3, - custom_key_4, - payload -) -VALUES ( - (SELECT id FROM deployments WHERE deployments.key = $1::deployment_key), - (CASE - WHEN $2::TEXT IS NULL THEN NULL - ELSE (SELECT id FROM requests ir WHERE ir.key = $2::TEXT) - END), - (CASE - WHEN $3::TEXT IS NULL THEN NULL - ELSE (SELECT id FROM requests ir WHERE ir.key = $3::TEXT) - END), - $4::TIMESTAMPTZ, - 'call', - $5::TEXT, - $6::TEXT, - $7::TEXT, - $8::TEXT, - $9 -) -` - -type InsertTimelineCallEventParams struct { - DeploymentKey model.DeploymentKey - RequestKey optional.Option[string] - ParentRequestKey optional.Option[string] - TimeStamp time.Time - SourceModule optional.Option[string] - SourceVerb optional.Option[string] - DestModule string - DestVerb string - Payload []byte -} - -func (q *Queries) InsertTimelineCallEvent(ctx context.Context, arg InsertTimelineCallEventParams) error { - _, err := q.db.ExecContext(ctx, insertTimelineCallEvent, - arg.DeploymentKey, - arg.RequestKey, - arg.ParentRequestKey, - arg.TimeStamp, - arg.SourceModule, - arg.SourceVerb, - arg.DestModule, - arg.DestVerb, - arg.Payload, - ) - return err -} - -const insertTimelineDeploymentCreatedEvent = `-- name: InsertTimelineDeploymentCreatedEvent :exec -INSERT INTO timeline ( - deployment_id, - type, - custom_key_1, - custom_key_2, - payload -) -VALUES ( - ( - SELECT id - FROM deployments - WHERE deployments.key = $1::deployment_key - ), - 'deployment_created', - $2::TEXT, - $3::TEXT, - $4 -) -` - -type InsertTimelineDeploymentCreatedEventParams struct { - DeploymentKey model.DeploymentKey - Language string - ModuleName string - Payload []byte -} - -func (q *Queries) InsertTimelineDeploymentCreatedEvent(ctx context.Context, arg InsertTimelineDeploymentCreatedEventParams) error { - _, err := q.db.ExecContext(ctx, insertTimelineDeploymentCreatedEvent, - arg.DeploymentKey, - arg.Language, - arg.ModuleName, - arg.Payload, - ) - return err -} - -const insertTimelineDeploymentUpdatedEvent = `-- name: InsertTimelineDeploymentUpdatedEvent :exec -INSERT INTO timeline ( - deployment_id, - type, - custom_key_1, - custom_key_2, - payload -) -VALUES ( - ( - SELECT id - FROM deployments - WHERE deployments.key = $1::deployment_key - ), - 'deployment_updated', - $2::TEXT, - $3::TEXT, - $4 -) -` - -type InsertTimelineDeploymentUpdatedEventParams struct { - DeploymentKey model.DeploymentKey - Language string - ModuleName string - Payload []byte -} - -func (q *Queries) InsertTimelineDeploymentUpdatedEvent(ctx context.Context, arg InsertTimelineDeploymentUpdatedEventParams) error { - _, err := q.db.ExecContext(ctx, insertTimelineDeploymentUpdatedEvent, - arg.DeploymentKey, - arg.Language, - arg.ModuleName, - arg.Payload, - ) - return err -} - -const insertTimelineEvent = `-- name: InsertTimelineEvent :exec -INSERT INTO timeline (deployment_id, request_id, parent_request_id, type, - custom_key_1, custom_key_2, custom_key_3, custom_key_4, - payload) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) -RETURNING id -` - -type InsertTimelineEventParams struct { - DeploymentID int64 - RequestID optional.Option[int64] - ParentRequestID optional.Option[string] - Type EventType - CustomKey1 optional.Option[string] - CustomKey2 optional.Option[string] - CustomKey3 optional.Option[string] - CustomKey4 optional.Option[string] - Payload []byte -} - -func (q *Queries) InsertTimelineEvent(ctx context.Context, arg InsertTimelineEventParams) error { - _, err := q.db.ExecContext(ctx, insertTimelineEvent, - arg.DeploymentID, - arg.RequestID, - arg.ParentRequestID, - arg.Type, - arg.CustomKey1, - arg.CustomKey2, - arg.CustomKey3, - arg.CustomKey4, - arg.Payload, - ) - return err -} - -const insertTimelineLogEvent = `-- name: InsertTimelineLogEvent :exec -INSERT INTO timeline ( - deployment_id, - request_id, - time_stamp, - custom_key_1, - type, - payload -) -VALUES ( - (SELECT id FROM deployments d WHERE d.key = $1::deployment_key LIMIT 1), - ( - CASE - WHEN $2::TEXT IS NULL THEN NULL - ELSE (SELECT id FROM requests ir WHERE ir.key = $2::TEXT LIMIT 1) - END - ), - $3::TIMESTAMPTZ, - $4::INT, - 'log', - $5 -) -` - -type InsertTimelineLogEventParams struct { - DeploymentKey model.DeploymentKey - RequestKey optional.Option[string] - TimeStamp time.Time - Level int32 - Payload []byte -} - -func (q *Queries) InsertTimelineLogEvent(ctx context.Context, arg InsertTimelineLogEventParams) error { - _, err := q.db.ExecContext(ctx, insertTimelineLogEvent, - arg.DeploymentKey, - arg.RequestKey, - arg.TimeStamp, - arg.Level, - arg.Payload, - ) - return err -} - -const isCronJobPending = `-- name: IsCronJobPending :one -SELECT EXISTS ( - SELECT 1 - FROM async_calls ac - WHERE ac.cron_job_key = $1::cron_job_key - AND ac.scheduled_at > $2::TIMESTAMPTZ - AND ac.state = 'pending' -) AS pending -` - -func (q *Queries) IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error) { - row := q.db.QueryRowContext(ctx, isCronJobPending, key, startTime) - var pending bool - err := row.Scan(&pending) - return pending, err -} - -const killStaleControllers = `-- name: KillStaleControllers :one -WITH matches AS ( - UPDATE controller - SET state = 'dead' - WHERE state <> 'dead' AND last_seen < (NOW() AT TIME ZONE 'utc') - $1::INTERVAL - RETURNING 1) -SELECT COUNT(*) -FROM matches -` - -// Mark any controller entries that haven't been updated recently as dead. -func (q *Queries) KillStaleControllers(ctx context.Context, timeout sqltypes.Duration) (int64, error) { - row := q.db.QueryRowContext(ctx, killStaleControllers, timeout) - var count int64 - err := row.Scan(&count) - return count, err -} - -const killStaleRunners = `-- name: KillStaleRunners :one -WITH matches AS ( - UPDATE runners - SET state = 'dead', - deployment_id = NULL - WHERE state <> 'dead' AND last_seen < (NOW() AT TIME ZONE 'utc') - $1::INTERVAL - RETURNING 1) -SELECT COUNT(*) -FROM matches -` - -func (q *Queries) KillStaleRunners(ctx context.Context, timeout sqltypes.Duration) (int64, error) { - row := q.db.QueryRowContext(ctx, killStaleRunners, timeout) - var count int64 - err := row.Scan(&count) - return count, err -} - -const loadAsyncCall = `-- name: LoadAsyncCall :one -SELECT id, created_at, lease_id, verb, state, origin, scheduled_at, request, response, error, remaining_attempts, backoff, max_backoff, catch_verb, catching, parent_request_key, trace_context, cron_job_key -FROM async_calls -WHERE id = $1 -` - -func (q *Queries) LoadAsyncCall(ctx context.Context, id int64) (AsyncCall, error) { - row := q.db.QueryRowContext(ctx, loadAsyncCall, id) - var i AsyncCall - err := row.Scan( - &i.ID, - &i.CreatedAt, - &i.LeaseID, - &i.Verb, - &i.State, - &i.Origin, - &i.ScheduledAt, - &i.Request, - &i.Response, - &i.Error, - &i.RemainingAttempts, - &i.Backoff, - &i.MaxBackoff, - &i.CatchVerb, - &i.Catching, - &i.ParentRequestKey, - &i.TraceContext, - &i.CronJobKey, - ) - return i, err -} - -const newLease = `-- name: NewLease :one -INSERT INTO leases ( - idempotency_key, - key, - expires_at, - metadata -) -VALUES ( - gen_random_uuid(), - $1::lease_key, - (NOW() AT TIME ZONE 'utc') + $2::interval, - $3::JSONB -) -RETURNING idempotency_key -` - -func (q *Queries) NewLease(ctx context.Context, key leases.Key, ttl sqltypes.Duration, metadata pqtype.NullRawMessage) (uuid.UUID, error) { - row := q.db.QueryRowContext(ctx, newLease, key, ttl, metadata) - var idempotency_key uuid.UUID - err := row.Scan(&idempotency_key) - return idempotency_key, err -} - -const popNextFSMEvent = `-- name: PopNextFSMEvent :one -DELETE FROM fsm_next_event -WHERE fsm_instance_id = ( - SELECT id - FROM fsm_instances - WHERE fsm = $1::schema_ref AND key = $2 -) -RETURNING id, created_at, fsm_instance_id, next_state, request, request_type -` - -func (q *Queries) PopNextFSMEvent(ctx context.Context, fsm schema.RefKey, instanceKey string) (FsmNextEvent, error) { - row := q.db.QueryRowContext(ctx, popNextFSMEvent, fsm, instanceKey) - var i FsmNextEvent - err := row.Scan( - &i.ID, - &i.CreatedAt, - &i.FsmInstanceID, - &i.NextState, - &i.Request, - &i.RequestType, - ) - return i, err -} - -const publishEventForTopic = `-- name: PublishEventForTopic :exec -INSERT INTO topic_events ( - "key", - topic_id, - caller, - payload, - request_key, - trace_context - ) -VALUES ( - $1::topic_event_key, - ( - SELECT topics.id - FROM topics - INNER JOIN modules ON topics.module_id = modules.id - WHERE modules.name = $2::TEXT - AND topics.name = $3::TEXT - ), - $4::TEXT, - $5, - $6::TEXT, - $7::jsonb -) -` - -type PublishEventForTopicParams struct { - Key model.TopicEventKey - Module string - Topic string - Caller string - Payload []byte - RequestKey string - TraceContext json.RawMessage -} - -func (q *Queries) PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error { - _, err := q.db.ExecContext(ctx, publishEventForTopic, - arg.Key, - arg.Module, - arg.Topic, - arg.Caller, - arg.Payload, - arg.RequestKey, - arg.TraceContext, - ) - return err -} - -const releaseLease = `-- name: ReleaseLease :one -DELETE FROM leases -WHERE idempotency_key = $1 AND key = $2::lease_key -RETURNING true -` - -func (q *Queries) ReleaseLease(ctx context.Context, idempotencyKey uuid.UUID, key leases.Key) (bool, error) { - row := q.db.QueryRowContext(ctx, releaseLease, idempotencyKey, key) - var column_1 bool - err := row.Scan(&column_1) - return column_1, err -} - -const renewLease = `-- name: RenewLease :one -UPDATE leases -SET expires_at = (NOW() AT TIME ZONE 'utc') + $1::interval -WHERE idempotency_key = $2 AND key = $3::lease_key -RETURNING true -` - -func (q *Queries) RenewLease(ctx context.Context, ttl sqltypes.Duration, idempotencyKey uuid.UUID, key leases.Key) (bool, error) { - row := q.db.QueryRowContext(ctx, renewLease, ttl, idempotencyKey, key) - var column_1 bool - err := row.Scan(&column_1) - return column_1, err -} - -const reserveRunner = `-- name: ReserveRunner :one -UPDATE runners -SET state = 'reserved', - reservation_timeout = $1::timestamptz, - -- If a deployment is not found, then the deployment ID is -1 - -- and the update will fail due to a FK constraint. - deployment_id = COALESCE((SELECT id - FROM deployments d - WHERE d.key = $2::deployment_key - LIMIT 1), -1) -WHERE id = (SELECT id - FROM runners r - WHERE r.state = 'idle' - AND r.labels @> $3::jsonb - LIMIT 1 FOR UPDATE SKIP LOCKED) -RETURNING runners.id, runners.key, runners.created, runners.last_seen, runners.reservation_timeout, runners.state, runners.endpoint, runners.module_name, runners.deployment_id, runners.labels -` - -// Find an idle runner and reserve it for the given deployment. -func (q *Queries) ReserveRunner(ctx context.Context, reservationTimeout time.Time, deploymentKey model.DeploymentKey, labels json.RawMessage) (Runner, error) { - row := q.db.QueryRowContext(ctx, reserveRunner, reservationTimeout, deploymentKey, labels) - var i Runner - err := row.Scan( - &i.ID, - &i.Key, - &i.Created, - &i.LastSeen, - &i.ReservationTimeout, - &i.State, - &i.Endpoint, - &i.ModuleName, - &i.DeploymentID, - &i.Labels, - ) - return i, err -} - -const setDeploymentDesiredReplicas = `-- name: SetDeploymentDesiredReplicas :exec -UPDATE deployments -SET min_replicas = $2 -WHERE key = $1::deployment_key -RETURNING 1 -` - -func (q *Queries) SetDeploymentDesiredReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int32) error { - _, err := q.db.ExecContext(ctx, setDeploymentDesiredReplicas, key, minReplicas) - return err -} - -const setNextFSMEvent = `-- name: SetNextFSMEvent :one -INSERT INTO fsm_next_event (fsm_instance_id, next_state, request, request_type) -VALUES ( - (SELECT id FROM fsm_instances WHERE fsm = $1::schema_ref AND key = $2), - $3, - $4, - $5::schema_type -) -RETURNING id -` - -type SetNextFSMEventParams struct { - Fsm schema.RefKey - InstanceKey string - Event schema.RefKey - Request []byte - RequestType Type -} - -func (q *Queries) SetNextFSMEvent(ctx context.Context, arg SetNextFSMEventParams) (int64, error) { - row := q.db.QueryRowContext(ctx, setNextFSMEvent, - arg.Fsm, - arg.InstanceKey, - arg.Event, - arg.Request, - arg.RequestType, - ) - var id int64 - err := row.Scan(&id) - return id, err -} - -const setSubscriptionCursor = `-- name: SetSubscriptionCursor :exec -WITH event AS ( - SELECT id, created_at, key, topic_id, payload - FROM topic_events - WHERE "key" = $2::topic_event_key -) -UPDATE topic_subscriptions -SET cursor = (SELECT id FROM event) -WHERE key = $1::subscription_key -` - -func (q *Queries) SetSubscriptionCursor(ctx context.Context, column1 model.SubscriptionKey, column2 model.TopicEventKey) error { - _, err := q.db.ExecContext(ctx, setSubscriptionCursor, column1, column2) - return err -} - -const startFSMTransition = `-- name: StartFSMTransition :one -INSERT INTO fsm_instances ( - fsm, - key, - destination_state, - async_call_id -) VALUES ( - $1, - $2, - $3::schema_ref, - $4::BIGINT -) -ON CONFLICT(fsm, key) DO -UPDATE SET - destination_state = $3::schema_ref, - async_call_id = $4::BIGINT, - updated_at = NOW() AT TIME ZONE 'utc' -WHERE - fsm_instances.async_call_id IS NULL - AND fsm_instances.destination_state IS NULL -RETURNING id, created_at, fsm, key, status, current_state, destination_state, async_call_id, updated_at -` - -type StartFSMTransitionParams struct { - Fsm schema.RefKey - Key string - DestinationState schema.RefKey - AsyncCallID int64 -} - -// Start a new FSM transition, populating the destination state and async call ID. -// -// "key" is the unique identifier for the FSM execution. -func (q *Queries) StartFSMTransition(ctx context.Context, arg StartFSMTransitionParams) (FsmInstance, error) { - row := q.db.QueryRowContext(ctx, startFSMTransition, - arg.Fsm, - arg.Key, - arg.DestinationState, - arg.AsyncCallID, - ) - var i FsmInstance - err := row.Scan( - &i.ID, - &i.CreatedAt, - &i.Fsm, - &i.Key, - &i.Status, - &i.CurrentState, - &i.DestinationState, - &i.AsyncCallID, - &i.UpdatedAt, - ) - return i, err -} - -const succeedAsyncCall = `-- name: SucceedAsyncCall :one -UPDATE async_calls -SET - state = 'success'::async_call_state, - response = $1, - error = null -WHERE id = $2 -RETURNING true -` - -func (q *Queries) SucceedAsyncCall(ctx context.Context, response []byte, iD int64) (bool, error) { - row := q.db.QueryRowContext(ctx, succeedAsyncCall, response, iD) - var column_1 bool - err := row.Scan(&column_1) - return column_1, err -} - -const succeedFSMInstance = `-- name: SucceedFSMInstance :one -UPDATE fsm_instances -SET - current_state = destination_state, - destination_state = NULL, - async_call_id = NULL, - status = 'completed'::fsm_status, - updated_at = NOW() AT TIME ZONE 'utc' -WHERE - fsm = $1::schema_ref AND key = $2::TEXT -RETURNING true -` - -func (q *Queries) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (bool, error) { - row := q.db.QueryRowContext(ctx, succeedFSMInstance, fsm, key) - var column_1 bool - err := row.Scan(&column_1) - return column_1, err -} - -const updateCronJobExecution = `-- name: UpdateCronJobExecution :exec -UPDATE cron_jobs - SET last_execution = $1::TIMESTAMPTZ, - next_execution = $2::TIMESTAMPTZ - WHERE key = $3::cron_job_key +const updateCronJobExecution = `-- name: UpdateCronJobExecution :exec +UPDATE cron_jobs + SET last_execution = $1::TIMESTAMPTZ, + next_execution = $2::TIMESTAMPTZ + WHERE key = $3::cron_job_key ` func (q *Queries) UpdateCronJobExecution(ctx context.Context, lastExecution time.Time, nextExecution time.Time, key model.CronJobKey) error { _, err := q.db.ExecContext(ctx, updateCronJobExecution, lastExecution, nextExecution, key) return err } - -const upsertController = `-- name: UpsertController :one -INSERT INTO controller (key, endpoint) -VALUES ($1, $2) -ON CONFLICT (key) DO UPDATE SET state = 'live', - endpoint = $2, - last_seen = NOW() AT TIME ZONE 'utc' -RETURNING id -` - -func (q *Queries) UpsertController(ctx context.Context, key model.ControllerKey, endpoint string) (int64, error) { - row := q.db.QueryRowContext(ctx, upsertController, key, endpoint) - var id int64 - err := row.Scan(&id) - return id, err -} - -const upsertModule = `-- name: UpsertModule :one -INSERT INTO modules (language, name) -VALUES ($1, $2) -ON CONFLICT (name) DO UPDATE SET language = $1 -RETURNING id -` - -func (q *Queries) UpsertModule(ctx context.Context, language string, name string) (int64, error) { - row := q.db.QueryRowContext(ctx, upsertModule, language, name) - var id int64 - err := row.Scan(&id) - return id, err -} - -const upsertRunner = `-- name: UpsertRunner :one -WITH deployment_rel AS ( - SELECT CASE - WHEN $5::deployment_key IS NULL - THEN NULL - ELSE COALESCE((SELECT id - FROM deployments d - WHERE d.key = $5::deployment_key - LIMIT 1), -1) END AS id) -INSERT -INTO runners (key, endpoint, state, labels, deployment_id, last_seen) -VALUES ($1, - $2, - $3, - $4, - (SELECT id FROM deployment_rel), - NOW() AT TIME ZONE 'utc') -ON CONFLICT (key) DO UPDATE SET endpoint = $2, - state = $3, - labels = $4, - deployment_id = (SELECT id FROM deployment_rel), - last_seen = NOW() AT TIME ZONE 'utc' -RETURNING deployment_id -` - -type UpsertRunnerParams struct { - Key model.RunnerKey - Endpoint string - State RunnerState - Labels json.RawMessage - DeploymentKey optional.Option[model.DeploymentKey] -} - -// Upsert a runner and return the deployment ID that it is assigned to, if any. -// If the deployment key is null, then deployment_rel.id will be null, -// otherwise we try to retrieve the deployments.id using the key. If -// there is no corresponding deployment, then the deployment ID is -1 -// and the parent statement will fail due to a foreign key constraint. -func (q *Queries) UpsertRunner(ctx context.Context, arg UpsertRunnerParams) (optional.Option[int64], error) { - row := q.db.QueryRowContext(ctx, upsertRunner, - arg.Key, - arg.Endpoint, - arg.State, - arg.Labels, - arg.DeploymentKey, - ) - var deployment_id optional.Option[int64] - err := row.Scan(&deployment_id) - return deployment_id, err -} - -const upsertSubscription = `-- name: UpsertSubscription :one -INSERT INTO topic_subscriptions ( - key, - topic_id, - module_id, - deployment_id, - name) -VALUES ( - $1::subscription_key, - ( - SELECT topics.id as id - FROM topics - INNER JOIN modules ON topics.module_id = modules.id - WHERE modules.name = $2::TEXT - AND topics.name = $3::TEXT - ), - (SELECT id FROM modules WHERE name = $4::TEXT), - (SELECT id FROM deployments WHERE key = $5::deployment_key), - $6::TEXT -) -ON CONFLICT (name, module_id) DO -UPDATE SET - topic_id = excluded.topic_id, - deployment_id = (SELECT id FROM deployments WHERE key = $5::deployment_key) -RETURNING - id, - CASE - WHEN xmax = 0 THEN true - ELSE false - END AS inserted -` - -type UpsertSubscriptionParams struct { - Key model.SubscriptionKey - TopicModule string - TopicName string - Module string - Deployment model.DeploymentKey - Name string -} - -type UpsertSubscriptionRow struct { - ID int64 - Inserted bool -} - -func (q *Queries) UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) (UpsertSubscriptionRow, error) { - row := q.db.QueryRowContext(ctx, upsertSubscription, - arg.Key, - arg.TopicModule, - arg.TopicName, - arg.Module, - arg.Deployment, - arg.Name, - ) - var i UpsertSubscriptionRow - err := row.Scan(&i.ID, &i.Inserted) - return i, err -} - -const upsertTopic = `-- name: UpsertTopic :exec -INSERT INTO topics (key, module_id, name, type) -VALUES ( - $1::topic_key, - (SELECT id FROM modules WHERE name = $2::TEXT LIMIT 1), - $3::TEXT, - $4::TEXT -) -ON CONFLICT (name, module_id) DO -UPDATE SET - type = $4::TEXT -RETURNING id -` - -type UpsertTopicParams struct { - Topic model.TopicKey - Module string - Name string - EventType string -} - -func (q *Queries) UpsertTopic(ctx context.Context, arg UpsertTopicParams) error { - _, err := q.db.ExecContext(ctx, upsertTopic, - arg.Topic, - arg.Module, - arg.Name, - arg.EventType, - ) - return err -} diff --git a/backend/controller/sql/async_queries.sql b/backend/controller/sql/async_queries.sql new file mode 100644 index 0000000000..cdedc4c9b0 --- /dev/null +++ b/backend/controller/sql/async_queries.sql @@ -0,0 +1,37 @@ +-- name: CreateAsyncCall :one +INSERT INTO async_calls ( + scheduled_at, + verb, + origin, + request, + remaining_attempts, + backoff, + max_backoff, + catch_verb, + parent_request_key, + trace_context, + cron_job_key +) +VALUES ( + @scheduled_at::TIMESTAMPTZ, + @verb, + @origin, + @request, + @remaining_attempts, + @backoff::interval, + @max_backoff::interval, + @catch_verb, + @parent_request_key, + @trace_context::jsonb, + @cron_job_key +) +RETURNING id; + +-- name: IsCronJobPending :one +SELECT EXISTS ( + SELECT 1 + FROM async_calls ac + WHERE ac.cron_job_key = sqlc.arg('key')::cron_job_key + AND ac.scheduled_at > sqlc.arg('start_time')::TIMESTAMPTZ + AND ac.state = 'pending' +) AS pending; diff --git a/backend/controller/sql/async_queries.sql.go b/backend/controller/sql/async_queries.sql.go new file mode 100644 index 0000000000..cc4a119160 --- /dev/null +++ b/backend/controller/sql/async_queries.sql.go @@ -0,0 +1,97 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 +// source: async_queries.sql + +package sql + +import ( + "context" + "encoding/json" + "time" + + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/model" + "github.com/alecthomas/types/optional" +) + +const createAsyncCall = `-- name: CreateAsyncCall :one +INSERT INTO async_calls ( + scheduled_at, + verb, + origin, + request, + remaining_attempts, + backoff, + max_backoff, + catch_verb, + parent_request_key, + trace_context, + cron_job_key +) +VALUES ( + $1::TIMESTAMPTZ, + $2, + $3, + $4, + $5, + $6::interval, + $7::interval, + $8, + $9, + $10::jsonb, + $11 +) +RETURNING id +` + +type CreateAsyncCallParams struct { + ScheduledAt time.Time + Verb schema.RefKey + Origin string + Request []byte + RemainingAttempts int32 + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration + CatchVerb optional.Option[schema.RefKey] + ParentRequestKey optional.Option[string] + TraceContext json.RawMessage + CronJobKey optional.Option[model.CronJobKey] +} + +func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) { + row := q.db.QueryRowContext(ctx, createAsyncCall, + arg.ScheduledAt, + arg.Verb, + arg.Origin, + arg.Request, + arg.RemainingAttempts, + arg.Backoff, + arg.MaxBackoff, + arg.CatchVerb, + arg.ParentRequestKey, + arg.TraceContext, + arg.CronJobKey, + ) + var id int64 + err := row.Scan(&id) + return id, err +} + +const isCronJobPending = `-- name: IsCronJobPending :one +SELECT EXISTS ( + SELECT 1 + FROM async_calls ac + WHERE ac.cron_job_key = $1::cron_job_key + AND ac.scheduled_at > $2::TIMESTAMPTZ + AND ac.state = 'pending' +) AS pending +` + +func (q *Queries) IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error) { + row := q.db.QueryRowContext(ctx, isCronJobPending, key, startTime) + var pending bool + err := row.Scan(&pending) + return pending, err +} diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index 883d140976..17f10334f0 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -105,48 +105,6 @@ func (ns NullControllerState) Value() (driver.Value, error) { return string(ns.ControllerState), nil } -type CronJobState string - -const ( - CronJobStateIdle CronJobState = "idle" - CronJobStateExecuting CronJobState = "executing" -) - -func (e *CronJobState) Scan(src interface{}) error { - switch s := src.(type) { - case []byte: - *e = CronJobState(s) - case string: - *e = CronJobState(s) - default: - return fmt.Errorf("unsupported scan type for CronJobState: %T", src) - } - return nil -} - -type NullCronJobState struct { - CronJobState CronJobState - Valid bool // Valid is true if CronJobState is not NULL -} - -// Scan implements the Scanner interface. -func (ns *NullCronJobState) Scan(value interface{}) error { - if value == nil { - ns.CronJobState, ns.Valid = "", false - return nil - } - ns.Valid = true - return ns.CronJobState.Scan(value) -} - -// Value implements the driver Valuer interface. -func (ns NullCronJobState) Value() (driver.Value, error) { - if !ns.Valid { - return nil, nil - } - return string(ns.CronJobState), nil -} - type EventType string const ( diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index 2a87c658b6..c7d0c5e999 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -470,35 +470,6 @@ FROM expired; -- name: GetLeaseInfo :one SELECT expires_at, metadata FROM leases WHERE key = @key::lease_key; --- name: CreateAsyncCall :one -INSERT INTO async_calls ( - scheduled_at, - verb, - origin, - request, - remaining_attempts, - backoff, - max_backoff, - catch_verb, - parent_request_key, - trace_context, - cron_job_key -) -VALUES ( - @scheduled_at::TIMESTAMPTZ, - @verb, - @origin, - @request, - @remaining_attempts, - @backoff::interval, - @max_backoff::interval, - @catch_verb, - @parent_request_key, - @trace_context::jsonb, - @cron_job_key -) -RETURNING id; - -- name: AsyncCallQueueDepth :one SELECT count(*) FROM async_calls diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index 4ae26b987b..5d5f08edfb 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -189,69 +189,6 @@ func (q *Queries) CreateArtefact(ctx context.Context, digest []byte, content []b return id, err } -const createAsyncCall = `-- name: CreateAsyncCall :one -INSERT INTO async_calls ( - scheduled_at, - verb, - origin, - request, - remaining_attempts, - backoff, - max_backoff, - catch_verb, - parent_request_key, - trace_context, - cron_job_key -) -VALUES ( - $1::TIMESTAMPTZ, - $2, - $3, - $4, - $5, - $6::interval, - $7::interval, - $8, - $9, - $10::jsonb, - $11 -) -RETURNING id -` - -type CreateAsyncCallParams struct { - ScheduledAt time.Time - Verb schema.RefKey - Origin string - Request []byte - RemainingAttempts int32 - Backoff sqltypes.Duration - MaxBackoff sqltypes.Duration - CatchVerb optional.Option[schema.RefKey] - ParentRequestKey optional.Option[string] - TraceContext json.RawMessage - CronJobKey optional.Option[model.CronJobKey] -} - -func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) { - row := q.db.QueryRowContext(ctx, createAsyncCall, - arg.ScheduledAt, - arg.Verb, - arg.Origin, - arg.Request, - arg.RemainingAttempts, - arg.Backoff, - arg.MaxBackoff, - arg.CatchVerb, - arg.ParentRequestKey, - arg.TraceContext, - arg.CronJobKey, - ) - var id int64 - err := row.Scan(&id) - return id, err -} - const createCronJob = `-- name: CreateCronJob :exec INSERT INTO cron_jobs (key, deployment_id, module_name, verb, schedule, start_time, next_execution) VALUES ( @@ -867,7 +804,7 @@ func (q *Queries) GetArtefactDigests(ctx context.Context, digests [][]byte) ([]G } const getCronJobByKey = `-- name: GetCronJobByKey :one -SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.last_execution +SELECT j.id, j.key, j.deployment_id, j.verb, j.schedule, j.start_time, j.next_execution, j.module_name, j.last_execution, d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id WHERE j.key = $1::cron_job_key @@ -875,28 +812,30 @@ FOR UPDATE SKIP LOCKED ` type GetCronJobByKeyRow struct { - Key model.CronJobKey - DeploymentKey model.DeploymentKey - Module string - Verb string - Schedule string - StartTime time.Time - NextExecution time.Time - LastExecution optional.Option[time.Time] + CronJob CronJob + Deployment Deployment } func (q *Queries) GetCronJobByKey(ctx context.Context, key model.CronJobKey) (GetCronJobByKeyRow, error) { row := q.db.QueryRowContext(ctx, getCronJobByKey, key) var i GetCronJobByKeyRow err := row.Scan( - &i.Key, - &i.DeploymentKey, - &i.Module, - &i.Verb, - &i.Schedule, - &i.StartTime, - &i.NextExecution, - &i.LastExecution, + &i.CronJob.ID, + &i.CronJob.Key, + &i.CronJob.DeploymentID, + &i.CronJob.Verb, + &i.CronJob.Schedule, + &i.CronJob.StartTime, + &i.CronJob.NextExecution, + &i.CronJob.ModuleName, + &i.CronJob.LastExecution, + &i.Deployment.ID, + &i.Deployment.CreatedAt, + &i.Deployment.ModuleID, + &i.Deployment.Key, + &i.Deployment.Schema, + &i.Deployment.Labels, + &i.Deployment.MinReplicas, ) return i, err } @@ -1847,7 +1786,7 @@ func (q *Queries) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent } const getUnscheduledCronJobs = `-- name: GetUnscheduledCronJobs :many -SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.last_execution +SELECT j.id, j.key, j.deployment_id, j.verb, j.schedule, j.start_time, j.next_execution, j.module_name, j.last_execution, d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id WHERE d.min_replicas > 0 @@ -1866,14 +1805,8 @@ FOR UPDATE SKIP LOCKED ` type GetUnscheduledCronJobsRow struct { - Key model.CronJobKey - DeploymentKey model.DeploymentKey - Module string - Verb string - Schedule string - StartTime time.Time - NextExecution time.Time - LastExecution optional.Option[time.Time] + CronJob CronJob + Deployment Deployment } func (q *Queries) GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]GetUnscheduledCronJobsRow, error) { @@ -1886,14 +1819,22 @@ func (q *Queries) GetUnscheduledCronJobs(ctx context.Context, startTime time.Tim for rows.Next() { var i GetUnscheduledCronJobsRow if err := rows.Scan( - &i.Key, - &i.DeploymentKey, - &i.Module, - &i.Verb, - &i.Schedule, - &i.StartTime, - &i.NextExecution, - &i.LastExecution, + &i.CronJob.ID, + &i.CronJob.Key, + &i.CronJob.DeploymentID, + &i.CronJob.Verb, + &i.CronJob.Schedule, + &i.CronJob.StartTime, + &i.CronJob.NextExecution, + &i.CronJob.ModuleName, + &i.CronJob.LastExecution, + &i.Deployment.ID, + &i.Deployment.CreatedAt, + &i.Deployment.ModuleID, + &i.Deployment.Key, + &i.Deployment.Schema, + &i.Deployment.Labels, + &i.Deployment.MinReplicas, ); err != nil { return nil, err } @@ -2178,23 +2119,6 @@ func (q *Queries) InsertTimelineLogEvent(ctx context.Context, arg InsertTimeline return err } -const isCronJobPending = `-- name: IsCronJobPending :one -SELECT EXISTS ( - SELECT 1 - FROM async_calls ac - WHERE ac.cron_job_key = $1::cron_job_key - AND ac.scheduled_at > $2::TIMESTAMPTZ - AND ac.state = 'pending' -) AS pending -` - -func (q *Queries) IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error) { - row := q.db.QueryRowContext(ctx, isCronJobPending, key, startTime) - var pending bool - err := row.Scan(&pending) - return pending, err -} - const killStaleControllers = `-- name: KillStaleControllers :one WITH matches AS ( UPDATE controller diff --git a/backend/controller/sql/schema/20240815164808_async_calls_cron_job_key.sql b/backend/controller/sql/schema/20240815164808_async_calls_cron_job_key.sql index fe4bf470ba..93615d0e6c 100644 --- a/backend/controller/sql/schema/20240815164808_async_calls_cron_job_key.sql +++ b/backend/controller/sql/schema/20240815164808_async_calls_cron_job_key.sql @@ -18,5 +18,7 @@ ALTER TABLE cron_jobs DROP COLUMN state, ADD COLUMN last_execution TIMESTAMPTZ; +DROP TYPE cron_job_state; + -- migrate:down diff --git a/internal/configuration/sql/models.go b/internal/configuration/sql/models.go index 883d140976..17f10334f0 100644 --- a/internal/configuration/sql/models.go +++ b/internal/configuration/sql/models.go @@ -105,48 +105,6 @@ func (ns NullControllerState) Value() (driver.Value, error) { return string(ns.ControllerState), nil } -type CronJobState string - -const ( - CronJobStateIdle CronJobState = "idle" - CronJobStateExecuting CronJobState = "executing" -) - -func (e *CronJobState) Scan(src interface{}) error { - switch s := src.(type) { - case []byte: - *e = CronJobState(s) - case string: - *e = CronJobState(s) - default: - return fmt.Errorf("unsupported scan type for CronJobState: %T", src) - } - return nil -} - -type NullCronJobState struct { - CronJobState CronJobState - Valid bool // Valid is true if CronJobState is not NULL -} - -// Scan implements the Scanner interface. -func (ns *NullCronJobState) Scan(value interface{}) error { - if value == nil { - ns.CronJobState, ns.Valid = "", false - return nil - } - ns.Valid = true - return ns.CronJobState.Scan(value) -} - -// Value implements the driver Valuer interface. -func (ns NullCronJobState) Value() (driver.Value, error) { - if !ns.Valid { - return nil, nil - } - return string(ns.CronJobState), nil -} - type EventType string const ( diff --git a/sqlc.yaml b/sqlc.yaml index eede42ca15..5cde27c31c 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -4,6 +4,7 @@ sql: engine: "postgresql" queries: - backend/controller/sql/queries.sql + - backend/controller/sql/async_queries.sql # FIXME: Until we fully decouple cron from the controller, we need to include the cron queries here - backend/controller/cronjobs/sql/queries.sql schema: "backend/controller/sql/schema" @@ -76,8 +77,6 @@ sql: nullable: true go_type: type: "optional.Option[model.DeploymentKey]" - - db_type: "cron_job_state" - go_type: "github.com/TBD54566975/ftl/internal/model.CronJobState" - db_type: "controller_key" go_type: "github.com/TBD54566975/ftl/internal/model.ControllerKey" - db_type: "request_key" @@ -143,8 +142,7 @@ sql: - <<: *daldir queries: - backend/controller/cronjobs/sql/queries.sql - # FIXME: We need to create async calls in the same transaction - - backend/controller/sql/queries.sql + - backend/controller/sql/async_queries.sql gen: go: <<: *gengo @@ -210,8 +208,6 @@ sql: nullable: true go_type: type: "optional.Option[model.DeploymentKey]" - - db_type: "cron_job_state" - go_type: "github.com/TBD54566975/ftl/internal/model.CronJobState" - db_type: "controller_key" go_type: "github.com/TBD54566975/ftl/internal/model.ControllerKey" - db_type: "request_key"