From 7810fc88e08c4e1f012da8f1046689f8c1bed6d4 Mon Sep 17 00:00:00 2001 From: Safeer Jiwan Date: Thu, 22 Aug 2024 16:01:33 -0700 Subject: [PATCH] use cron_jobs.last_async_call_id instead of async_calls.cron_job_key --- backend/controller/cronjobs/cronjobs.go | 11 ++-- .../cronjobs/sql/async_queries.sql.go | 16 +++-- backend/controller/cronjobs/sql/models.go | 20 +++---- backend/controller/cronjobs/sql/querier.go | 2 +- backend/controller/cronjobs/sql/queries.sql | 13 ++--- .../controller/cronjobs/sql/queries.sql.go | 39 ++++++++----- backend/controller/sql/async_queries.sql | 11 ++-- backend/controller/sql/async_queries.sql.go | 16 +++-- backend/controller/sql/models.go | 20 +++---- backend/controller/sql/querier.go | 2 +- backend/controller/sql/queries.sql.go | 58 +++++++++++-------- ...0240815164808_async_calls_cron_job_key.sql | 22 +++---- internal/configuration/sql/models.go | 20 +++---- sqlc.yaml | 10 ++++ 14 files changed, 139 insertions(+), 121 deletions(-) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 2145305011..a5d30e24e9 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" - "github.com/alecthomas/types/optional" "github.com/benbjohnson/clock" cronsql "github.com/TBD54566975/ftl/backend/controller/cronjobs/sql" @@ -173,7 +172,7 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *Tx, job model.CronJob } logger.Tracef("Scheduling cron job %q async_call execution at %s", job.Key, nextAttemptForJob) - _, err = tx.db.CreateAsyncCall(ctx, cronsql.CreateAsyncCallParams{ + id, err := tx.db.CreateAsyncCall(ctx, cronsql.CreateAsyncCallParams{ ScheduledAt: nextAttemptForJob, Verb: schema.RefKey{Module: job.Verb.Module, Name: job.Verb.Name}, Origin: fmt.Sprintf("cron:%s", job.Key), @@ -181,7 +180,6 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *Tx, job model.CronJob RemainingAttempts: 0, Backoff: 0, MaxBackoff: 0, - CronJobKey: optional.Some(job.Key), }) if err != nil { return fmt.Errorf("failed to create async call for job %q: %w", job.Key, err) @@ -191,7 +189,12 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *Tx, job model.CronJob return fmt.Errorf("failed to calculate future execution for cron job %q with schedule %q: %w", job.Key, job.Schedule, err) } logger.Tracef("Updating cron job %q with last attempt at %s and next attempt at %s", job.Key, nextAttemptForJob, futureAttemptForJob) - err = tx.db.UpdateCronJobExecution(ctx, nextAttemptForJob, futureAttemptForJob, job.Key) + err = tx.db.UpdateCronJobExecution(ctx, cronsql.UpdateCronJobExecutionParams{ + LastAsyncCallID: id, + LastExecution: nextAttemptForJob, + NextExecution: futureAttemptForJob, + Key: job.Key, + }) if err != nil { return fmt.Errorf("failed to update cron job %q: %w", job.Key, err) } diff --git a/backend/controller/cronjobs/sql/async_queries.sql.go b/backend/controller/cronjobs/sql/async_queries.sql.go index cc4a119160..07b6cc3b35 100644 --- a/backend/controller/cronjobs/sql/async_queries.sql.go +++ b/backend/controller/cronjobs/sql/async_queries.sql.go @@ -12,6 +12,7 @@ import ( "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/encryption" "github.com/TBD54566975/ftl/internal/model" "github.com/alecthomas/types/optional" ) @@ -27,8 +28,7 @@ INSERT INTO async_calls ( max_backoff, catch_verb, parent_request_key, - trace_context, - cron_job_key + trace_context ) VALUES ( $1::TIMESTAMPTZ, @@ -40,8 +40,7 @@ VALUES ( $7::interval, $8, $9, - $10::jsonb, - $11 + $10::jsonb ) RETURNING id ` @@ -50,14 +49,13 @@ type CreateAsyncCallParams struct { ScheduledAt time.Time Verb schema.RefKey Origin string - Request []byte + Request encryption.EncryptedAsyncColumn RemainingAttempts int32 Backoff sqltypes.Duration MaxBackoff sqltypes.Duration CatchVerb optional.Option[schema.RefKey] ParentRequestKey optional.Option[string] TraceContext json.RawMessage - CronJobKey optional.Option[model.CronJobKey] } func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) { @@ -72,7 +70,6 @@ func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams arg.CatchVerb, arg.ParentRequestKey, arg.TraceContext, - arg.CronJobKey, ) var id int64 err := row.Scan(&id) @@ -82,8 +79,9 @@ func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams const isCronJobPending = `-- name: IsCronJobPending :one SELECT EXISTS ( SELECT 1 - FROM async_calls ac - WHERE ac.cron_job_key = $1::cron_job_key + FROM cron_jobs j + INNER JOIN async_calls ac on j.last_async_call_id = ac.id + WHERE j.key = $1::cron_job_key AND ac.scheduled_at > $2::TIMESTAMPTZ AND ac.state = 'pending' ) AS pending diff --git a/backend/controller/cronjobs/sql/models.go b/backend/controller/cronjobs/sql/models.go index 42608bbe04..039df94a07 100644 --- a/backend/controller/cronjobs/sql/models.go +++ b/backend/controller/cronjobs/sql/models.go @@ -347,7 +347,6 @@ type AsyncCall struct { Catching bool ParentRequestKey optional.Option[string] TraceContext pqtype.NullRawMessage - CronJobKey optional.Option[model.CronJobKey] } type Controller struct { @@ -360,15 +359,16 @@ type Controller struct { } type CronJob struct { - ID int64 - Key model.CronJobKey - DeploymentID int64 - Verb string - Schedule string - StartTime time.Time - NextExecution time.Time - ModuleName string - LastExecution optional.Option[time.Time] + ID int64 + Key model.CronJobKey + DeploymentID int64 + Verb string + Schedule string + StartTime time.Time + NextExecution time.Time + ModuleName string + LastExecution optional.Option[time.Time] + LastAsyncCallID optional.Option[int64] } type Deployment struct { diff --git a/backend/controller/cronjobs/sql/querier.go b/backend/controller/cronjobs/sql/querier.go index 4c726cbf5a..7f1b44088a 100644 --- a/backend/controller/cronjobs/sql/querier.go +++ b/backend/controller/cronjobs/sql/querier.go @@ -17,7 +17,7 @@ type Querier interface { GetCronJobByKey(ctx context.Context, key model.CronJobKey) (GetCronJobByKeyRow, error) GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]GetUnscheduledCronJobsRow, error) IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error) - UpdateCronJobExecution(ctx context.Context, lastExecution time.Time, nextExecution time.Time, key model.CronJobKey) error + UpdateCronJobExecution(ctx context.Context, arg UpdateCronJobExecutionParams) error } var _ Querier = (*Queries)(nil) diff --git a/backend/controller/cronjobs/sql/queries.sql b/backend/controller/cronjobs/sql/queries.sql index 251e5365a6..ff58ba35fa 100644 --- a/backend/controller/cronjobs/sql/queries.sql +++ b/backend/controller/cronjobs/sql/queries.sql @@ -5,16 +5,12 @@ FROM cron_jobs j WHERE d.min_replicas > 0 AND j.start_time < sqlc.arg('start_time')::TIMESTAMPTZ AND ( - j.last_execution IS NULL + j.last_async_call_id IS NULL OR NOT EXISTS ( SELECT 1 FROM async_calls ac - WHERE - ac.cron_job_key = j.key - AND ( - ac.scheduled_at > j.last_execution OR - (ac.scheduled_at = j.last_execution AND ac.state IN ('pending', 'executing')) - ) + WHERE ac.id = j.last_async_call_id + AND ac.state IN ('pending', 'executing') ) ) FOR UPDATE SKIP LOCKED; @@ -39,6 +35,7 @@ INSERT INTO cron_jobs (key, deployment_id, module_name, verb, schedule, start_ti -- name: UpdateCronJobExecution :exec UPDATE cron_jobs - SET last_execution = sqlc.arg('last_execution')::TIMESTAMPTZ, + SET last_async_call_id = sqlc.arg('last_async_call_id')::BIGINT, + last_execution = sqlc.arg('last_execution')::TIMESTAMPTZ, next_execution = sqlc.arg('next_execution')::TIMESTAMPTZ WHERE key = sqlc.arg('key')::cron_job_key; \ No newline at end of file diff --git a/backend/controller/cronjobs/sql/queries.sql.go b/backend/controller/cronjobs/sql/queries.sql.go index 8c11e08a16..5aa1e39123 100644 --- a/backend/controller/cronjobs/sql/queries.sql.go +++ b/backend/controller/cronjobs/sql/queries.sql.go @@ -48,7 +48,7 @@ func (q *Queries) CreateCronJob(ctx context.Context, arg CreateCronJobParams) er } const getCronJobByKey = `-- name: GetCronJobByKey :one -SELECT j.id, j.key, j.deployment_id, j.verb, j.schedule, j.start_time, j.next_execution, j.module_name, j.last_execution, d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas +SELECT j.id, j.key, j.deployment_id, j.verb, j.schedule, j.start_time, j.next_execution, j.module_name, j.last_execution, j.last_async_call_id, d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id WHERE j.key = $1::cron_job_key @@ -73,6 +73,7 @@ func (q *Queries) GetCronJobByKey(ctx context.Context, key model.CronJobKey) (Ge &i.CronJob.NextExecution, &i.CronJob.ModuleName, &i.CronJob.LastExecution, + &i.CronJob.LastAsyncCallID, &i.Deployment.ID, &i.Deployment.CreatedAt, &i.Deployment.ModuleID, @@ -85,22 +86,18 @@ func (q *Queries) GetCronJobByKey(ctx context.Context, key model.CronJobKey) (Ge } const getUnscheduledCronJobs = `-- name: GetUnscheduledCronJobs :many -SELECT j.id, j.key, j.deployment_id, j.verb, j.schedule, j.start_time, j.next_execution, j.module_name, j.last_execution, d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas +SELECT j.id, j.key, j.deployment_id, j.verb, j.schedule, j.start_time, j.next_execution, j.module_name, j.last_execution, j.last_async_call_id, d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id WHERE d.min_replicas > 0 AND j.start_time < $1::TIMESTAMPTZ AND ( - j.last_execution IS NULL + j.last_async_call_id IS NULL OR NOT EXISTS ( SELECT 1 FROM async_calls ac - WHERE - ac.cron_job_key = j.key - AND ( - ac.scheduled_at > j.last_execution OR - (ac.scheduled_at = j.last_execution AND ac.state IN ('pending', 'executing')) - ) + WHERE ac.id = j.last_async_call_id + AND ac.state IN ('pending', 'executing') ) ) FOR UPDATE SKIP LOCKED @@ -130,6 +127,7 @@ func (q *Queries) GetUnscheduledCronJobs(ctx context.Context, startTime time.Tim &i.CronJob.NextExecution, &i.CronJob.ModuleName, &i.CronJob.LastExecution, + &i.CronJob.LastAsyncCallID, &i.Deployment.ID, &i.Deployment.CreatedAt, &i.Deployment.ModuleID, @@ -153,12 +151,25 @@ func (q *Queries) GetUnscheduledCronJobs(ctx context.Context, startTime time.Tim const updateCronJobExecution = `-- name: UpdateCronJobExecution :exec UPDATE cron_jobs - SET last_execution = $1::TIMESTAMPTZ, - next_execution = $2::TIMESTAMPTZ - WHERE key = $3::cron_job_key + SET last_async_call_id = $1::BIGINT, + last_execution = $2::TIMESTAMPTZ, + next_execution = $3::TIMESTAMPTZ + WHERE key = $4::cron_job_key ` -func (q *Queries) UpdateCronJobExecution(ctx context.Context, lastExecution time.Time, nextExecution time.Time, key model.CronJobKey) error { - _, err := q.db.ExecContext(ctx, updateCronJobExecution, lastExecution, nextExecution, key) +type UpdateCronJobExecutionParams struct { + LastAsyncCallID int64 + LastExecution time.Time + NextExecution time.Time + Key model.CronJobKey +} + +func (q *Queries) UpdateCronJobExecution(ctx context.Context, arg UpdateCronJobExecutionParams) error { + _, err := q.db.ExecContext(ctx, updateCronJobExecution, + arg.LastAsyncCallID, + arg.LastExecution, + arg.NextExecution, + arg.Key, + ) return err } diff --git a/backend/controller/sql/async_queries.sql b/backend/controller/sql/async_queries.sql index cdedc4c9b0..1f69718a83 100644 --- a/backend/controller/sql/async_queries.sql +++ b/backend/controller/sql/async_queries.sql @@ -9,8 +9,7 @@ INSERT INTO async_calls ( max_backoff, catch_verb, parent_request_key, - trace_context, - cron_job_key + trace_context ) VALUES ( @scheduled_at::TIMESTAMPTZ, @@ -22,16 +21,16 @@ VALUES ( @max_backoff::interval, @catch_verb, @parent_request_key, - @trace_context::jsonb, - @cron_job_key + @trace_context::jsonb ) RETURNING id; -- name: IsCronJobPending :one SELECT EXISTS ( SELECT 1 - FROM async_calls ac - WHERE ac.cron_job_key = sqlc.arg('key')::cron_job_key + FROM cron_jobs j + INNER JOIN async_calls ac on j.last_async_call_id = ac.id + WHERE j.key = sqlc.arg('key')::cron_job_key AND ac.scheduled_at > sqlc.arg('start_time')::TIMESTAMPTZ AND ac.state = 'pending' ) AS pending; diff --git a/backend/controller/sql/async_queries.sql.go b/backend/controller/sql/async_queries.sql.go index cc4a119160..07b6cc3b35 100644 --- a/backend/controller/sql/async_queries.sql.go +++ b/backend/controller/sql/async_queries.sql.go @@ -12,6 +12,7 @@ import ( "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/encryption" "github.com/TBD54566975/ftl/internal/model" "github.com/alecthomas/types/optional" ) @@ -27,8 +28,7 @@ INSERT INTO async_calls ( max_backoff, catch_verb, parent_request_key, - trace_context, - cron_job_key + trace_context ) VALUES ( $1::TIMESTAMPTZ, @@ -40,8 +40,7 @@ VALUES ( $7::interval, $8, $9, - $10::jsonb, - $11 + $10::jsonb ) RETURNING id ` @@ -50,14 +49,13 @@ type CreateAsyncCallParams struct { ScheduledAt time.Time Verb schema.RefKey Origin string - Request []byte + Request encryption.EncryptedAsyncColumn RemainingAttempts int32 Backoff sqltypes.Duration MaxBackoff sqltypes.Duration CatchVerb optional.Option[schema.RefKey] ParentRequestKey optional.Option[string] TraceContext json.RawMessage - CronJobKey optional.Option[model.CronJobKey] } func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) { @@ -72,7 +70,6 @@ func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams arg.CatchVerb, arg.ParentRequestKey, arg.TraceContext, - arg.CronJobKey, ) var id int64 err := row.Scan(&id) @@ -82,8 +79,9 @@ func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams const isCronJobPending = `-- name: IsCronJobPending :one SELECT EXISTS ( SELECT 1 - FROM async_calls ac - WHERE ac.cron_job_key = $1::cron_job_key + FROM cron_jobs j + INNER JOIN async_calls ac on j.last_async_call_id = ac.id + WHERE j.key = $1::cron_job_key AND ac.scheduled_at > $2::TIMESTAMPTZ AND ac.state = 'pending' ) AS pending diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index 42608bbe04..039df94a07 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -347,7 +347,6 @@ type AsyncCall struct { Catching bool ParentRequestKey optional.Option[string] TraceContext pqtype.NullRawMessage - CronJobKey optional.Option[model.CronJobKey] } type Controller struct { @@ -360,15 +359,16 @@ type Controller struct { } type CronJob struct { - ID int64 - Key model.CronJobKey - DeploymentID int64 - Verb string - Schedule string - StartTime time.Time - NextExecution time.Time - ModuleName string - LastExecution optional.Option[time.Time] + ID int64 + Key model.CronJobKey + DeploymentID int64 + Verb string + Schedule string + StartTime time.Time + NextExecution time.Time + ModuleName string + LastExecution optional.Option[time.Time] + LastAsyncCallID optional.Option[int64] } type Deployment struct { diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index 535f049bf2..33293fc322 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -117,8 +117,8 @@ type Querier interface { StartFSMTransition(ctx context.Context, arg StartFSMTransitionParams) (FsmInstance, error) SucceedAsyncCall(ctx context.Context, response encryption.OptionalEncryptedAsyncColumn, iD int64) (bool, error) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (bool, error) + UpdateCronJobExecution(ctx context.Context, arg UpdateCronJobExecutionParams) error UpdateEncryptionVerification(ctx context.Context, verifyTimeline encryption.OptionalEncryptedTimelineColumn, verifyAsync encryption.OptionalEncryptedAsyncColumn) error - UpdateCronJobExecution(ctx context.Context, lastExecution time.Time, nextExecution time.Time, key model.CronJobKey) error UpsertController(ctx context.Context, key model.ControllerKey, endpoint string) (int64, error) UpsertModule(ctx context.Context, language string, name string) (int64, error) // Upsert a runner and return the deployment ID that it is assigned to, if any. diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index 3448fdff61..02cca852f6 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -440,7 +440,7 @@ WITH updated AS ( SET state = 'error'::async_call_state, error = $7::TEXT WHERE id = $8::BIGINT - RETURNING id, created_at, lease_id, verb, state, origin, scheduled_at, request, response, error, remaining_attempts, backoff, max_backoff, catch_verb, catching, parent_request_key, trace_context, cron_job_key + RETURNING id, created_at, lease_id, verb, state, origin, scheduled_at, request, response, error, remaining_attempts, backoff, max_backoff, catch_verb, catching, parent_request_key, trace_context ) INSERT INTO async_calls ( verb, @@ -805,7 +805,7 @@ func (q *Queries) GetArtefactDigests(ctx context.Context, digests [][]byte) ([]G } const getCronJobByKey = `-- name: GetCronJobByKey :one -SELECT j.id, j.key, j.deployment_id, j.verb, j.schedule, j.start_time, j.next_execution, j.module_name, j.last_execution, d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas +SELECT j.id, j.key, j.deployment_id, j.verb, j.schedule, j.start_time, j.next_execution, j.module_name, j.last_execution, j.last_async_call_id, d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id WHERE j.key = $1::cron_job_key @@ -830,6 +830,7 @@ func (q *Queries) GetCronJobByKey(ctx context.Context, key model.CronJobKey) (Ge &i.CronJob.NextExecution, &i.CronJob.ModuleName, &i.CronJob.LastExecution, + &i.CronJob.LastAsyncCallID, &i.Deployment.ID, &i.Deployment.CreatedAt, &i.Deployment.ModuleID, @@ -1793,22 +1794,18 @@ func (q *Queries) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent } const getUnscheduledCronJobs = `-- name: GetUnscheduledCronJobs :many -SELECT j.id, j.key, j.deployment_id, j.verb, j.schedule, j.start_time, j.next_execution, j.module_name, j.last_execution, d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas +SELECT j.id, j.key, j.deployment_id, j.verb, j.schedule, j.start_time, j.next_execution, j.module_name, j.last_execution, j.last_async_call_id, d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id WHERE d.min_replicas > 0 AND j.start_time < $1::TIMESTAMPTZ AND ( - j.last_execution IS NULL + j.last_async_call_id IS NULL OR NOT EXISTS ( SELECT 1 FROM async_calls ac - WHERE - ac.cron_job_key = j.key - AND ( - ac.scheduled_at > j.last_execution OR - (ac.scheduled_at = j.last_execution AND ac.state IN ('pending', 'executing')) - ) + WHERE ac.id = j.last_async_call_id + AND ac.state IN ('pending', 'executing') ) ) FOR UPDATE SKIP LOCKED @@ -1838,6 +1835,7 @@ func (q *Queries) GetUnscheduledCronJobs(ctx context.Context, startTime time.Tim &i.CronJob.NextExecution, &i.CronJob.ModuleName, &i.CronJob.LastExecution, + &i.CronJob.LastAsyncCallID, &i.Deployment.ID, &i.Deployment.CreatedAt, &i.Deployment.ModuleID, @@ -2166,7 +2164,7 @@ func (q *Queries) KillStaleRunners(ctx context.Context, timeout sqltypes.Duratio } const loadAsyncCall = `-- name: LoadAsyncCall :one -SELECT id, created_at, lease_id, verb, state, origin, scheduled_at, request, response, error, remaining_attempts, backoff, max_backoff, catch_verb, catching, parent_request_key, trace_context, cron_job_key +SELECT id, created_at, lease_id, verb, state, origin, scheduled_at, request, response, error, remaining_attempts, backoff, max_backoff, catch_verb, catching, parent_request_key, trace_context FROM async_calls WHERE id = $1 ` @@ -2192,7 +2190,6 @@ func (q *Queries) LoadAsyncCall(ctx context.Context, id int64) (AsyncCall, error &i.Catching, &i.ParentRequestKey, &i.TraceContext, - &i.CronJobKey, ) return i, err } @@ -2508,6 +2505,31 @@ func (q *Queries) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, key return column_1, err } +const updateCronJobExecution = `-- name: UpdateCronJobExecution :exec +UPDATE cron_jobs + SET last_async_call_id = $1::BIGINT, + last_execution = $2::TIMESTAMPTZ, + next_execution = $3::TIMESTAMPTZ + WHERE key = $4::cron_job_key +` + +type UpdateCronJobExecutionParams struct { + LastAsyncCallID int64 + LastExecution time.Time + NextExecution time.Time + Key model.CronJobKey +} + +func (q *Queries) UpdateCronJobExecution(ctx context.Context, arg UpdateCronJobExecutionParams) error { + _, err := q.db.ExecContext(ctx, updateCronJobExecution, + arg.LastAsyncCallID, + arg.LastExecution, + arg.NextExecution, + arg.Key, + ) + return err +} + const updateEncryptionVerification = `-- name: UpdateEncryptionVerification :exec UPDATE encryption_keys SET verify_timeline = $1, @@ -2520,18 +2542,6 @@ func (q *Queries) UpdateEncryptionVerification(ctx context.Context, verifyTimeli return err } -const updateCronJobExecution = `-- name: UpdateCronJobExecution :exec -UPDATE cron_jobs - SET last_execution = $1::TIMESTAMPTZ, - next_execution = $2::TIMESTAMPTZ - WHERE key = $3::cron_job_key -` - -func (q *Queries) UpdateCronJobExecution(ctx context.Context, lastExecution time.Time, nextExecution time.Time, key model.CronJobKey) error { - _, err := q.db.ExecContext(ctx, updateCronJobExecution, lastExecution, nextExecution, key) - return err -} - const upsertController = `-- name: UpsertController :one INSERT INTO controller (key, endpoint) VALUES ($1, $2) diff --git a/backend/controller/sql/schema/20240815164808_async_calls_cron_job_key.sql b/backend/controller/sql/schema/20240815164808_async_calls_cron_job_key.sql index 93615d0e6c..b67ba4ead9 100644 --- a/backend/controller/sql/schema/20240815164808_async_calls_cron_job_key.sql +++ b/backend/controller/sql/schema/20240815164808_async_calls_cron_job_key.sql @@ -1,22 +1,14 @@ -- migrate:up -ALTER TABLE async_calls - ADD COLUMN cron_job_key cron_job_key; - -ALTER TABLE async_calls - ADD CONSTRAINT fk_async_calls_cron_job_key - FOREIGN KEY (cron_job_key) REFERENCES cron_jobs(key) - ON DELETE SET NULL; - -CREATE INDEX idx_async_calls_cron_job_key - ON async_calls (cron_job_key); - -CREATE INDEX idx_async_calls_cron_job_key_scheduled_at - ON async_calls (cron_job_key, scheduled_at); - ALTER TABLE cron_jobs DROP COLUMN state, - ADD COLUMN last_execution TIMESTAMPTZ; + ADD COLUMN last_execution TIMESTAMPTZ, + ADD COLUMN last_async_call_id BIGINT; + +ALTER TABLE cron_jobs + ADD CONSTRAINT fk_cron_jobs_last_async_call_id + FOREIGN KEY (last_async_call_id) REFERENCES async_calls(id) + ON DELETE SET NULL; DROP TYPE cron_job_state; diff --git a/internal/configuration/sql/models.go b/internal/configuration/sql/models.go index 42608bbe04..039df94a07 100644 --- a/internal/configuration/sql/models.go +++ b/internal/configuration/sql/models.go @@ -347,7 +347,6 @@ type AsyncCall struct { Catching bool ParentRequestKey optional.Option[string] TraceContext pqtype.NullRawMessage - CronJobKey optional.Option[model.CronJobKey] } type Controller struct { @@ -360,15 +359,16 @@ type Controller struct { } type CronJob struct { - ID int64 - Key model.CronJobKey - DeploymentID int64 - Verb string - Schedule string - StartTime time.Time - NextExecution time.Time - ModuleName string - LastExecution optional.Option[time.Time] + ID int64 + Key model.CronJobKey + DeploymentID int64 + Verb string + Schedule string + StartTime time.Time + NextExecution time.Time + ModuleName string + LastExecution optional.Option[time.Time] + LastAsyncCallID optional.Option[int64] } type Deployment struct { diff --git a/sqlc.yaml b/sqlc.yaml index 0a064974f1..e1ce6df811 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -206,6 +206,16 @@ sql: nullable: true go_type: type: "optional.Option[model.CronJobKey]" + - db_type: "encrypted_async" + go_type: "github.com/TBD54566975/ftl/internal/encryption.EncryptedAsyncColumn" + - db_type: "encrypted_async" + nullable: true + go_type: "github.com/TBD54566975/ftl/internal/encryption.OptionalEncryptedAsyncColumn" + - db_type: "encrypted_timeline" + go_type: "github.com/TBD54566975/ftl/internal/encryption.EncryptedTimelineColumn" + - db_type: "encrypted_timeline" + nullable: true + go_type: "github.com/TBD54566975/ftl/internal/encryption.OptionalEncryptedTimelineColumn" - db_type: "lease_key" go_type: "github.com/TBD54566975/ftl/backend/controller/leases.Key" - db_type: "lease_key"