From 8ce9fafd60953ca34a6d354b96eb1dd9fc305bde Mon Sep 17 00:00:00 2001 From: Safeer Jiwan Date: Fri, 23 Aug 2024 16:01:13 -0700 Subject: [PATCH] feat: rewrite cron to use async system (#2407) This refactor removes the cron job system's state management, cron execution, and hashring management, in favor of the async call system. Data tables changed as follows: - `cron_jobs` is still used to maintain the job list - Removes `state`, adds `last_execution::timestampz` and `last_async_call_id::bigint` - `async_calls` is inserted to by the cron system - `cron_jobs.last_async_call_id = async_calls.id` - Adds the notion of a `cron` origin in `async_calls.origin` After a deployment, all valid unscheduled cron jobs are scheduled; a row is added to `async_calls` with a `pending` state and `scheduled_at` is set to the job's next execution time. The corresponding row in `cron_jobs` is also updated with the scheduled async call, the computed next execution time, and the inserted async call ID. On completion of a cron async call, the next execution of that job is scheduled. Effectively, every cron job will have exactly one scheduled execution. Closes #2197 --------- Co-authored-by: Alec Thomas --- .golangci.yml | 3 +- Justfile | 2 +- backend/controller/controller.go | 25 +- backend/controller/cronjobs/cronjobs.go | 472 ++++-------------- .../cronjobs/cronjobs_integration_test.go | 32 +- backend/controller/cronjobs/cronjobs_test.go | 206 ++++---- .../cronjobs/cronjobs_utils_test.go | 263 ---------- backend/controller/cronjobs/dal/dal.go | 147 +++--- .../cronjobs/sql/async_queries.sql.go | 89 ++++ backend/controller/cronjobs/sql/conn.go | 73 +++ backend/controller/cronjobs/sql/models.go | 61 +-- backend/controller/cronjobs/sql/querier.go | 11 +- backend/controller/cronjobs/sql/queries.sql | 76 ++- .../controller/cronjobs/sql/queries.sql.go | 266 ++++------ backend/controller/cronjobs/state.go | 83 --- backend/controller/dal/async_calls.go | 24 +- backend/controller/dal/async_calls_test.go | 27 + backend/controller/dal/fsm.go | 1 + backend/controller/dal/pubsub.go | 1 + .../controller/observability/async_calls.go | 5 + backend/controller/sql/async_queries.sql | 31 ++ backend/controller/sql/async_queries.sql.go | 89 ++++ backend/controller/sql/models.go | 61 +-- backend/controller/sql/querier.go | 8 +- backend/controller/sql/queries.sql | 30 -- backend/controller/sql/queries.sql.go | 410 +++++---------- ...0240815164808_async_calls_cron_job_key.sql | 16 + go.mod | 1 - go.sum | 2 - internal/configuration/sql/models.go | 61 +-- internal/model/cron_job.go | 11 +- sqlc.yaml | 7 +- 32 files changed, 992 insertions(+), 1602 deletions(-) delete mode 100644 backend/controller/cronjobs/cronjobs_utils_test.go create mode 100644 backend/controller/cronjobs/sql/async_queries.sql.go delete mode 100644 backend/controller/cronjobs/state.go create mode 100644 backend/controller/sql/async_queries.sql create mode 100644 backend/controller/sql/async_queries.sql.go create mode 100644 backend/controller/sql/schema/20240815164808_async_calls_cron_job_key.sql diff --git a/.golangci.yml b/.golangci.yml index 7ddeb6d1ec..1f069c05f9 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -136,4 +136,5 @@ issues: - "fmt.Errorf can be replaced with errors.New" - "fmt.Sprintf can be replaced with string concatenation" - "strings.Title has been deprecated" - - "error returned from external package is unwrapped.*TranslatePGError" \ No newline at end of file + - "error returned from external package is unwrapped.*TranslatePGError" + - "struct literal uses unkeyed fields" \ No newline at end of file diff --git a/Justfile b/Justfile index e01003d985..91d7878abb 100644 --- a/Justfile +++ b/Justfile @@ -75,7 +75,7 @@ init-db: # Regenerate SQLC code (requires init-db to be run first) build-sqlc: - @mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} backend/controller/{cronjobs}/sql/{db.go,models.go,querier.go,queries.sql.go} internal/configuration/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/{cronjobs}/sql/queries.sql internal/configuration/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate" + @mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} backend/controller/cronjobs/sql/{db.go,models.go,querier.go,queries.sql.go} internal/configuration/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/sql/async_queries.sql backend/controller/cronjobs/sql/queries.sql internal/configuration/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate" # Build the ZIP files that are embedded in the FTL release binaries build-zips: build-kt-runtime diff --git a/backend/controller/controller.go b/backend/controller/controller.go index ddd8f39ccf..1ba66372d8 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -249,9 +249,8 @@ func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling svc.routes.Store(map[string][]dal.Route{}) svc.schema.Store(&schema.Schema{}) - cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, conn, svc.tasks, svc.callWithRequest) + cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, conn) svc.cronJobs = cronSvc - svc.controllerListListeners = append(svc.controllerListListeners, cronSvc) pubSub := pubsub.New(ctx, db, svc.tasks, svc) svc.pubSub = pubSub @@ -541,7 +540,10 @@ func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.Re } } - s.cronJobs.CreatedOrReplacedDeloyment(ctx, newDeploymentKey) + err = s.cronJobs.CreatedOrReplacedDeloyment(ctx) + if err != nil { + return nil, fmt.Errorf("could not schedule cron jobs: %w", err) + } return connect.NewResponse(&ftlv1.ReplaceDeployResponse{}), nil } @@ -1403,7 +1405,11 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration logger.Tracef("No async calls to execute") return time.Second * 2, nil } else if err != nil { - observability.AsyncCalls.Acquired(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, call.Catching, err) + if call == nil { + observability.AsyncCalls.AcquireFailed(ctx, err) + } else { + observability.AsyncCalls.Acquired(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, call.Catching, err) + } return 0, err } @@ -1430,6 +1436,9 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration if returnErr == nil { // Post-commit notification based on origin switch origin := call.Origin.(type) { + case dal.AsyncOriginCron: + break + case dal.AsyncOriginFSM: break @@ -1568,6 +1577,9 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call * func metadataForAsyncCall(call *dal.AsyncCall) *ftlv1.Metadata { switch origin := call.Origin.(type) { + case dal.AsyncOriginCron: + return &ftlv1.Metadata{} + case dal.AsyncOriginFSM: return &ftlv1.Metadata{ Values: []*ftlv1.Metadata_Pair{ @@ -1595,6 +1607,11 @@ func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.Tx, call *dal.A // Allow for handling of completion based on origin switch origin := call.Origin.(type) { + case dal.AsyncOriginCron: + if err := s.cronJobs.OnJobCompletion(ctx, origin.CronJobKey, failed); err != nil { + return fmt.Errorf("failed to finalize cron async call: %w", err) + } + case dal.AsyncOriginFSM: if err := s.onAsyncFSMCallCompletion(ctx, tx, origin, failed, isFinalResult); err != nil { return fmt.Errorf("failed to finalize FSM async call: %w", err) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index c892c7000f..2f1a9efcd2 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -3,126 +3,44 @@ package cronjobs import ( "context" "database/sql" - "encoding/json" "errors" "fmt" - "time" - "connectrpc.com/connect" - "github.com/alecthomas/atomic" - "github.com/alecthomas/types/optional" - "github.com/alecthomas/types/pubsub" "github.com/benbjohnson/clock" - "github.com/jpillora/backoff" - "github.com/serialx/hashring" "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal" + cronsql "github.com/TBD54566975/ftl/backend/controller/cronjobs/sql" parentdal "github.com/TBD54566975/ftl/backend/controller/dal" - "github.com/TBD54566975/ftl/backend/controller/observability" - "github.com/TBD54566975/ftl/backend/controller/scheduledtask" - ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/cron" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" - "github.com/TBD54566975/ftl/internal/slices" ) -const ( - controllersPerJob = 2 - jobResetInterval = time.Minute - newJobHashRingOverrideInterval = time.Minute + time.Second*20 -) - -type Config struct { - Timeout time.Duration -} - -//sumtype:decl -type event interface { - // cronJobEvent is a marker to ensure that all events implement the interface. - cronJobEvent() -} - -type syncEvent struct { - jobs []model.CronJob - addedDeploymentKey optional.Option[model.DeploymentKey] -} - -func (syncEvent) cronJobEvent() {} - -type endedJobsEvent struct { - jobs []model.CronJob -} - -func (endedJobsEvent) cronJobEvent() {} - -type updatedHashRingEvent struct{} - -func (updatedHashRingEvent) cronJobEvent() {} - -type hashRingState struct { - hashRing *hashring.HashRing - controllers []parentdal.Controller - idx int -} - -type DAL interface { - GetCronJobs(ctx context.Context) ([]model.CronJob, error) - StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []dal.AttemptedCronJob, err error) - EndCronJob(ctx context.Context, job model.CronJob, next time.Time) (model.CronJob, error) - GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]model.CronJob, error) -} - -type Scheduler interface { - Singleton(retry backoff.Backoff, job scheduledtask.Job) - Parallel(retry backoff.Backoff, job scheduledtask.Job) -} - -type ExecuteCallFunc func(context.Context, *connect.Request[ftlv1.CallRequest], optional.Option[model.RequestKey], optional.Option[model.RequestKey], string) (*connect.Response[ftlv1.CallResponse], error) - type Service struct { - config Config key model.ControllerKey requestSource string - - dal DAL - scheduler Scheduler - call ExecuteCallFunc - - clock clock.Clock - events *pubsub.Topic[event] - - hashRingState atomic.Value[*hashRingState] + dal dal.DAL + clock clock.Clock } -func New(ctx context.Context, key model.ControllerKey, requestSource string, config Config, conn *sql.DB, scheduler Scheduler, call ExecuteCallFunc) *Service { - return NewForTesting(ctx, key, requestSource, config, dal.New(conn), scheduler, call, clock.New()) +func New(ctx context.Context, key model.ControllerKey, requestSource string, conn *sql.DB) *Service { + return NewForTesting(ctx, key, requestSource, *dal.New(conn), clock.New()) } -func NewForTesting(ctx context.Context, key model.ControllerKey, requestSource string, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc, clock clock.Clock) *Service { +func NewForTesting(ctx context.Context, key model.ControllerKey, requestSource string, dal dal.DAL, clock clock.Clock) *Service { svc := &Service{ - config: config, key: key, requestSource: requestSource, dal: dal, - scheduler: scheduler, - call: call, clock: clock, - events: pubsub.New[event](), } - svc.UpdatedControllerList(ctx, nil) - - svc.scheduler.Parallel(backoff.Backoff{Min: time.Second, Max: jobResetInterval}, svc.syncJobs) - svc.scheduler.Singleton(backoff.Backoff{Min: time.Second, Max: time.Minute}, svc.killOldJobs) - - go svc.watchForUpdates(ctx) - return svc } func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Module) ([]model.CronJob, error) { + logger := log.FromContext(ctx).Scope("cron") start := s.clock.Now().UTC() newJobs := []model.CronJob{} merr := []error{} @@ -153,344 +71,132 @@ func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Mod Schedule: cronStr, StartTime: start, NextExecution: next, - State: model.CronJobStateIdle, // DeploymentKey: Filled in by DAL }) } } + logger.Tracef("Found %d cron jobs", len(newJobs)) if len(merr) > 0 { return nil, errors.Join(merr...) } return newJobs, nil } -// CreatedOrReplacedDeloyment is only called by the responsible controller to its cron service, and will not be received by the other cron services. -// When a controller creates/replaces a deployment, its cron job service is responsible for -// the newly created cron jobs until other controllers have a chance to resync their list of jobs and start sharing responsibility of the new cron jobs. -func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context, newDeploymentKey model.DeploymentKey) { - // Rather than finding old/new cron jobs and updating our state, we can just resync the list of jobs - _ = s.syncJobsWithNewDeploymentKey(ctx, optional.Some(newDeploymentKey)) //nolint:errcheck // TODO(matt2e) is this valid? -} - -// SyncJobs is run periodically via a scheduled task -func (s *Service) syncJobs(ctx context.Context) (time.Duration, error) { - err := s.syncJobsWithNewDeploymentKey(ctx, optional.None[model.DeploymentKey]()) - if err != nil { - return 0, err - } - return jobResetInterval, nil -} - -// syncJobsWithNewDeploymentKey resyncs the list of jobs and marks the deployment key as added so that it can overrule the hash ring for a short time. -func (s *Service) syncJobsWithNewDeploymentKey(ctx context.Context, deploymentKey optional.Option[model.DeploymentKey]) error { - logger := log.FromContext(ctx) - - jobs, err := s.dal.GetCronJobs(ctx) +// CreatedOrReplacedDeloyment is called by the responsible controller to its cron service, we can +// schedule all cron jobs here since the cron_jobs rows are locked within the transaction and the +// controllers won't step on each other. +func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context) error { + logger := log.FromContext(ctx).Scope("cron") + logger.Tracef("New deployment; scheduling cron jobs") + err := s.scheduleCronJobs(ctx) if err != nil { - logger.Errorf(err, "failed to get cron jobs") - return fmt.Errorf("failed to get cron jobs: %w", err) + return fmt.Errorf("failed to schedule cron jobs: %w", err) } - s.events.Publish(syncEvent{ - jobs: jobs, - addedDeploymentKey: deploymentKey, - }) return nil } -func (s *Service) executeJob(ctx context.Context, job model.CronJob) { - logger := log.FromContext(ctx) - requestBody := map[string]any{} - requestJSON, err := json.Marshal(requestBody) - if err != nil { - logger.Errorf(err, "could not build body for cron job: %v", job.Key) - observability.Cron.JobFailedStart(ctx, job) - return - } - - req := connect.NewRequest(&ftlv1.CallRequest{ - Verb: &schemapb.Ref{Module: job.Verb.Module, Name: job.Verb.Name}, - Body: requestJSON, - }) - - requestKey := model.NewRequestKey(model.OriginCron, fmt.Sprintf("%s-%s", job.Verb.Module, job.Verb.Name)) - - callCtx, cancel := context.WithTimeout(ctx, s.config.Timeout) - defer cancel() - observability.Cron.JobStarted(ctx, job) - _, err = s.call(callCtx, req, optional.Some(requestKey), optional.None[model.RequestKey](), s.requestSource) - - // Record execution success/failure metric now and leave post job-execution-action observability to logging - if err != nil { - logger.Errorf(err, "failed to execute cron job %v", job.Key) - observability.Cron.JobFailed(ctx, job) - // Do not return, continue to end the job and schedule the next execution - } else { - observability.Cron.JobSuccess(ctx, job) - } - - schedule, err := cron.Parse(job.Schedule) - if err != nil { - logger.Errorf(err, "failed to parse cron schedule %q", job.Schedule) - return - } - next, err := cron.NextAfter(schedule, s.clock.Now().UTC(), false) - if err != nil { - logger.Errorf(err, "failed to calculate next execution for cron job %v with schedule %q", job.Key, job.Schedule) - return - } +// scheduleCronJobs schedules all cron jobs that are not already scheduled. +func (s *Service) scheduleCronJobs(ctx context.Context) (err error) { + logger := log.FromContext(ctx).Scope("cron") + now := s.clock.Now().UTC() - updatedJob, err := s.dal.EndCronJob(ctx, job, next) + tx, err := s.dal.Begin(ctx) if err != nil { - logger.Errorf(err, "failed to end cron job %v", job.Key) - } else { - s.events.Publish(endedJobsEvent{ - jobs: []model.CronJob{updatedJob}, - }) + return fmt.Errorf("failed to begin transaction: %w", err) } -} + defer tx.CommitOrRollback(ctx, &err) -// killOldJobs looks for jobs that have been executing for too long. -// A soft timeout should normally occur from the job's context timing out, but there are cases where this does not happen (eg: unresponsive or dead controller) -// In these cases we need a hard timout after an additional grace period. -// To do this, this function resets these job's state to idle and updates the next execution time in the db so the job can be picked up again next time. -func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { - logger := log.FromContext(ctx) - staleJobs, err := s.dal.GetStaleCronJobs(ctx, s.config.Timeout+time.Minute) + jobs, err := tx.GetUnscheduledCronJobs(ctx, now) if err != nil { - return 0, err - } else if len(staleJobs) == 0 { - return time.Minute, nil + return fmt.Errorf("failed to get unscheduled cron jobs: %w", err) } - - updatedJobs := []model.CronJob{} - for _, stale := range staleJobs { - start := s.clock.Now().UTC() - pattern, err := cron.Parse(stale.Schedule) + logger.Tracef("Scheduling %d cron jobs", len(jobs)) + for _, job := range jobs { + err = s.scheduleCronJob(ctx, tx, job) if err != nil { - logger.Errorf(err, "Could not kill stale cron job %q because schedule could not be parsed: %q", stale.Key, stale.Schedule) - continue - } - next, err := cron.NextAfter(pattern, start, false) - if err != nil { - logger.Errorf(err, "Could not kill stale cron job %q because next date could not be calculated: %q", stale.Key, stale.Schedule) - continue + return fmt.Errorf("failed to schedule cron job %q: %w", job.Key, err) } - - updated, err := s.dal.EndCronJob(ctx, stale, next) - if err != nil { - logger.Errorf(err, "Could not kill stale cron job %s because: %v", stale.Key, err) - continue - } - logger.Warnf("Killed stale cron job %s", stale.Key) - observability.Cron.JobKilled(ctx, stale) - updatedJobs = append(updatedJobs, updated) } - s.events.Publish(endedJobsEvent{ - jobs: updatedJobs, - }) - - return time.Minute, nil + return nil } -// watchForUpdates is the centralized place that handles: -// - the list of known jobs and their state -// - executing jobs when they are due -// - reacting to events that change the list of jobs, deployments or hash ring -// -// State is private to this function to ensure thread safety. -func (s *Service) watchForUpdates(ctx context.Context) { +// OnJobCompletion is called by the controller when a cron job async call completes. We schedule +// the next execution of the cron job here. +func (s *Service) OnJobCompletion(ctx context.Context, key model.CronJobKey, failed bool) (err error) { logger := log.FromContext(ctx).Scope("cron") + logger.Tracef("Cron job %q completed with failed=%v", key, failed) - events := make(chan event, 128) - s.events.Subscribe(events) - defer s.events.Unsubscribe(events) - - state := &state{ - executing: map[string]bool{}, - newJobs: map[string]time.Time{}, - blockedUntil: s.clock.Now(), - } - - for { - now := s.clock.Now() - next := now.Add(time.Hour) // should never be reached, expect a different signal long beforehand - for _, j := range state.jobs { - if possibleNext, err := s.nextAttemptForJob(j, state, now, false); err == nil && possibleNext.Before(next) { - next = possibleNext - } - } - - if next.Before(state.blockedUntil) { - next = state.blockedUntil - logger.Tracef("loop blocked for %v", next.Sub(now)) - } else if next.Sub(now) < time.Second { - next = now.Add(time.Second) - logger.Tracef("loop while gated for 1s") - } else if next.Sub(now) > time.Minute*59 { - logger.Tracef("loop while idling") - } else { - logger.Tracef("loop with next %v, %d jobs", next.Sub(now), len(state.jobs)) - } - - select { - case <-ctx.Done(): - return - case <-s.clock.After(next.Sub(now)): - // Try starting jobs in db - // note that we use next here are the current time - // as if there is a pause of over a second we could miss jobs if we use the current time - // this is very unlikely to happen, but if it did it would be hard to diagnose - jobsToAttempt := slices.Filter(state.jobs, func(j model.CronJob) bool { - if n, err := s.nextAttemptForJob(j, state, next, true); err == nil { - return !n.After(s.clock.Now().UTC()) - } - return false - }) - jobResults, err := s.dal.StartCronJobs(ctx, jobsToAttempt) - if err != nil { - logger.Errorf(err, "failed to start cron jobs in db") - state.blockedUntil = s.clock.Now().Add(time.Second * 5) - continue - } - - // Start jobs that were successfully updated - updatedJobs := []model.CronJob{} - removedDeploymentKeys := map[string]model.DeploymentKey{} - - for _, job := range jobResults { - updatedJobs = append(updatedJobs, job.CronJob) - if !job.DidStartExecution { - continue - } - if !job.HasMinReplicas { - // We successfully updated the db to start this job but the deployment has min replicas set to 0 - // We need to update the db to end this job - removedDeploymentKeys[job.DeploymentKey.String()] = job.DeploymentKey - _, err := s.dal.EndCronJob(ctx, job.CronJob, next) - if err != nil { - logger.Errorf(err, "failed to end cron job %s", job.Key.String()) - } - continue - } - logger.Infof("executing job %v", job.Key) - state.startedExecutingJob(job.CronJob) - go s.executeJob(ctx, job.CronJob) - } - - // Update job list - state.updateJobs(updatedJobs) - for _, key := range removedDeploymentKeys { - state.removeDeploymentKey(key) - } - case e := <-events: - switch event := e.(type) { - case syncEvent: - logger.Tracef("syncing job list: %d jobs", len(event.jobs)) - state.sync(event.jobs, event.addedDeploymentKey) - case endedJobsEvent: - logger.Tracef("updating %d jobs", len(event.jobs)) - state.updateJobs(event.jobs) - case updatedHashRingEvent: - // do another cycle through the loop to see if new jobs need to be scheduled - } - } + tx, err := s.dal.Begin(ctx) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) } -} + defer tx.CommitOrRollback(ctx, &err) -func (s *Service) nextAttemptForJob(job model.CronJob, state *state, currentTime time.Time, allowsNow bool) (time.Time, error) { - currentTime = currentTime.UTC() - if !s.isResponsibleForJob(job, state) { - return s.clock.Now(), fmt.Errorf("controller is not responsible for job") + job, err := tx.GetCronJobByKey(ctx, key) + if err != nil { + return fmt.Errorf("failed to get cron job %q: %w", key, err) } - if job.State == model.CronJobStateExecuting { - if state.isExecutingInCurrentController(job) { - // no need to schedule this job until it finishes - return s.clock.Now(), fmt.Errorf("controller is already waiting for job to finish") - } - // We don't know when the other controller that is executing this job will finish it - // So we should optimistically attempt it when the next execution date is due assuming the job finishes - pattern, err := cron.Parse(job.Schedule) - if err != nil { - return s.clock.Now(), fmt.Errorf("failed to parse cron schedule %q", job.Schedule) - } - next, err := cron.NextAfter(pattern, currentTime, allowsNow) - if err == nil { - return next, nil - } + err = s.scheduleCronJob(ctx, tx, job) + if err != nil { + return fmt.Errorf("failed to schedule cron job %q: %w", key, err) } - return job.NextExecution, nil + return nil } -// UpdatedControllerList synchronises the hash ring with the active controllers. -func (s *Service) UpdatedControllerList(ctx context.Context, controllers []parentdal.Controller) { +// scheduleCronJob schedules the next execution of a single cron job. +func (s *Service) scheduleCronJob(ctx context.Context, tx *dal.Tx, job model.CronJob) error { logger := log.FromContext(ctx).Scope("cron") - controllerIdx := -1 - for idx, controller := range controllers { - if controller.Key.String() == s.key.String() { - controllerIdx = idx - break - } + now := s.clock.Now().UTC() + pending, err := tx.IsCronJobPending(ctx, job.Key, now) + if err != nil { + return fmt.Errorf("failed to check if cron job %q is pending: %w", job.Key, err) } - if controllerIdx == -1 { - logger.Tracef("controller %q not found in list of controllers", s.key) + if pending { + logger.Tracef("Attempt to schedule cron job %q which is already pending", job.Key) + return nil } - oldState := s.hashRingState.Load() - if oldState != nil && len(oldState.controllers) == len(controllers) { - hasChanged := false - for idx, new := range controllers { - old := oldState.controllers[idx] - if new.Key.String() != old.Key.String() { - hasChanged = true - break - } - } - if !hasChanged { - return - } + pattern, err := cron.Parse(job.Schedule) + if err != nil { + return fmt.Errorf("failed to parse cron schedule %q: %w", job.Schedule, err) } - - hashRing := hashring.New(slices.Map(controllers, func(c parentdal.Controller) string { return c.Key.String() })) - s.hashRingState.Store(&hashRingState{ - hashRing: hashRing, - controllers: controllers, - idx: controllerIdx, - }) - - s.events.Publish(updatedHashRingEvent{}) -} - -// isResponsibleForJob indicates whether a this service should be responsible for attempting jobs, -// or if enough other controllers will handle it. This allows us to spread the job load across controllers. -func (s *Service) isResponsibleForJob(job model.CronJob, state *state) bool { - if state.isJobTooNewForHashRing(job) { - return true + originTime := job.StartTime + if t, ok := job.LastExecution.Get(); ok { + originTime = t } - hashringState := s.hashRingState.Load() - if hashringState == nil { - return true + nextAttemptForJob, err := cron.NextAfter(pattern, originTime, false) + if err != nil { + return fmt.Errorf("failed to calculate next execution for cron job %q with schedule %q: %w", job.Key, job.Schedule, err) } - - initialKey, ok := hashringState.hashRing.GetNode(job.Key.String()) - if !ok { - return true + if nextAttemptForJob.Before(now) { + nextAttemptForJob = now } - initialIdx := -1 - for idx, controller := range hashringState.controllers { - if controller.Key.String() == initialKey { - initialIdx = idx - break - } - } - if initialIdx == -1 { - return true + logger.Tracef("Scheduling cron job %q async_call execution at %s", job.Key, nextAttemptForJob) + origin := &parentdal.AsyncOriginCron{CronJobKey: job.Key} + id, err := tx.CreateAsyncCall(ctx, cronsql.CreateAsyncCallParams{ + ScheduledAt: nextAttemptForJob, + Verb: schema.RefKey{Module: job.Verb.Module, Name: job.Verb.Name}, + Origin: origin.String(), + Request: []byte(`{}`), + }) + if err != nil { + return fmt.Errorf("failed to create async call for job %q: %w", job.Key, err) } - - if initialIdx+controllersPerJob > len(hashringState.controllers) { - // wraps around - return hashringState.idx >= initialIdx || hashringState.idx < (initialIdx+controllersPerJob)-len(hashringState.controllers) + futureAttemptForJob, err := cron.NextAfter(pattern, nextAttemptForJob, false) + if err != nil { + return fmt.Errorf("failed to calculate future execution for cron job %q with schedule %q: %w", job.Key, job.Schedule, err) + } + logger.Tracef("Updating cron job %q with last attempt at %s and next attempt at %s", job.Key, nextAttemptForJob, futureAttemptForJob) + err = tx.UpdateCronJobExecution(ctx, cronsql.UpdateCronJobExecutionParams{ + LastAsyncCallID: id, + LastExecution: nextAttemptForJob, + NextExecution: futureAttemptForJob, + Key: job.Key, + }) + if err != nil { + return fmt.Errorf("failed to update cron job %q: %w", job.Key, err) } - return hashringState.idx >= initialIdx && hashringState.idx < initialIdx+controllersPerJob + return nil } diff --git a/backend/controller/cronjobs/cronjobs_integration_test.go b/backend/controller/cronjobs/cronjobs_integration_test.go index 34ed96fc75..1b65cdb952 100644 --- a/backend/controller/cronjobs/cronjobs_integration_test.go +++ b/backend/controller/cronjobs/cronjobs_integration_test.go @@ -3,46 +3,16 @@ package cronjobs import ( - "context" "os" "path/filepath" "testing" "time" "github.com/alecthomas/assert/v2" - "github.com/benbjohnson/clock" - db "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal" - parentdb "github.com/TBD54566975/ftl/backend/controller/dal" - "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" - "github.com/TBD54566975/ftl/internal/encryption" in "github.com/TBD54566975/ftl/internal/integration" - "github.com/TBD54566975/ftl/internal/log" ) -func TestServiceWithRealDal(t *testing.T) { - t.Parallel() - ctx := log.ContextWithNewDefaultLogger(context.Background()) - ctx, cancel := context.WithCancel(ctx) - t.Cleanup(cancel) - - conn := sqltest.OpenForTesting(ctx, t) - dal := db.New(conn) - parentDAL, err := parentdb.New(ctx, conn, encryption.NewBuilder()) - assert.NoError(t, err) - - // Using a real clock because real db queries use db clock - // delay until we are on an odd second - clk := clock.New() - if clk.Now().Second()%2 == 0 { - time.Sleep(time.Second - time.Duration(clk.Now().Nanosecond())*time.Nanosecond) - } else { - time.Sleep(2*time.Second - time.Duration(clk.Now().Nanosecond())*time.Nanosecond) - } - - testServiceWithDal(ctx, t, dal, parentDAL, clk) -} - func TestCron(t *testing.T) { dir := t.TempDir() // Due to some MacOS magic, /tmp differs between this test code and the @@ -57,6 +27,8 @@ func TestCron(t *testing.T) { in.WithLanguages("go", "java"), in.CopyModule("cron"), in.Deploy("cron"), + in.Wait("cron"), + in.Sleep(1*time.Second), func(t testing.TB, ic in.TestContext) { _, err := os.Stat(tmpFile) assert.NoError(t, err) diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go index 531617779d..7ff9c6052f 100644 --- a/backend/controller/cronjobs/cronjobs_test.go +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -2,28 +2,27 @@ package cronjobs import ( "context" - "sync" + "fmt" + "strconv" "testing" "time" - "connectrpc.com/connect" "github.com/alecthomas/assert/v2" - "github.com/alecthomas/types/optional" + "github.com/alecthomas/types/either" "github.com/benbjohnson/clock" - xslices "golang.org/x/exp/slices" - db "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal" + parentdal "github.com/TBD54566975/ftl/backend/controller/dal" "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" - ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + dalerrs "github.com/TBD54566975/ftl/backend/dal" "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/cron" "github.com/TBD54566975/ftl/internal/encryption" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" - "github.com/TBD54566975/ftl/internal/slices" ) -func TestServiceWithMockDal(t *testing.T) { - t.Skip("TODO: sometimes blocks on CI. Discussion in issue #1368") +func TestNewCronJobsForModule(t *testing.T) { t.Parallel() ctx := log.ContextWithNewDefaultLogger(context.Background()) ctx, cancel := context.WithCancel(ctx) @@ -32,98 +31,123 @@ func TestServiceWithMockDal(t *testing.T) { clk := clock.NewMock() clk.Add(time.Second) // half way between cron job executions - mockDal := &mockDAL{ - clock: clk, - lock: sync.Mutex{}, - attemptCountMap: map[string]int{}, - } + key := model.NewControllerKey("localhost", strconv.Itoa(8080+1)) conn := sqltest.OpenForTesting(ctx, t) - parentDAL, err := db.New(ctx, conn, encryption.NewBuilder()) + dal := dal.New(conn) + parentDAL, err := parentdal.New(ctx, conn, encryption.NewBuilder()) assert.NoError(t, err) - - testServiceWithDal(ctx, t, mockDal, parentDAL, clk) -} - -func TestHashRing(t *testing.T) { - if testing.Short() { - t.SkipNow() - } - // This test uses multiple mock clocks to progress time for each controller individually - // This allows us to compare attempts for each cron job and know which controller attempted it - t.Parallel() - ctx := log.ContextWithNewDefaultLogger(context.Background()) - ctx, cancel := context.WithCancel(ctx) - t.Cleanup(cancel) - - mockDal := &mockDAL{ - clock: clock.NewMock(), - lock: sync.Mutex{}, - attemptCountMap: map[string]int{}, - } moduleName := "initial" - jobsToCreate := newJobs(t, moduleName, "*/10 * * * * * *", mockDal.clock, 100) + jobsToCreate := newCronJobs(t, moduleName, "* * * * * *", clk, 2) // every minute - deploymentKey, err := mockDal.CreateDeployment(ctx, "go", &schema.Module{ + deploymentKey, err := parentDAL.CreateDeployment(ctx, "go", &schema.Module{ Name: moduleName, - }, []db.DeploymentArtefact{}, []db.IngressRoutingEntry{}, jobsToCreate) + }, []parentdal.DeploymentArtefact{}, []parentdal.IngressRoutingEntry{}, jobsToCreate) + assert.NoError(t, err) + err = parentDAL.ReplaceDeployment(ctx, deploymentKey, 1) assert.NoError(t, err) - err = mockDal.ReplaceDeployment(ctx, deploymentKey, 1) + // Progress so that start_time is valid + clk.Add(time.Second) + cjs := NewForTesting(ctx, key, "test.com", *dal, clk) + // All jobs need to be scheduled + expectUnscheduledJobs(t, dal, clk, 2) + unscheduledJobs, err := dal.GetUnscheduledCronJobs(ctx, clk.Now()) assert.NoError(t, err) + assert.Equal(t, len(unscheduledJobs), 2) + + // No async calls yet + _, err = parentDAL.AcquireAsyncCall(ctx) + assert.IsError(t, err, dalerrs.ErrNotFound) + assert.EqualError(t, err, "no pending async calls: not found") - controllers := newControllers(ctx, 20, mockDal, func() clock.Clock { return clock.NewMock() }, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], p optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { - return &connect.Response[ftlv1.CallResponse]{}, nil - }) - - // This should give time for each controller to start watching its own mock clock - // If we don;t wait here, we might hit a race condition outlined in issue #1368 - time.Sleep(time.Millisecond * 100) - - // progress time for each controller one at a time, noting which verbs got attempted each time - // to build a map of verb to controller keys - controllersForVerbs := map[string][]model.ControllerKey{} - for _, c := range controllers { - mockDal.lock.Lock() - beforeAttemptCount := map[string]int{} - for k, v := range mockDal.attemptCountMap { - beforeAttemptCount[k] = v - } - mockDal.lock.Unlock() - - c.mockClock.Add(time.Second * 15) - time.Sleep(time.Millisecond * 100) - - mockDal.lock.Lock() - for k, v := range mockDal.attemptCountMap { - if beforeAttemptCount[k] == v { - continue - } - controllersForVerbs[k] = append(controllersForVerbs[k], c.key) - } - mockDal.lock.Unlock() + err = cjs.scheduleCronJobs(ctx) + assert.NoError(t, err) + expectUnscheduledJobs(t, dal, clk, 0) + for _, job := range jobsToCreate { + j, err := dal.GetCronJobByKey(ctx, job.Key) + assert.NoError(t, err) + assert.Equal(t, job.StartTime, j.StartTime) + assert.Equal(t, j.NextExecution, clk.Now().Add(time.Second)) + + p, err := dal.IsCronJobPending(ctx, job.Key, job.StartTime) + assert.NoError(t, err) + assert.True(t, p) + } + // Now there should be async calls + calls := []*parentdal.AsyncCall{} + for i, job := range jobsToCreate { + call, err := parentDAL.AcquireAsyncCall(ctx) + assert.NoError(t, err) + assert.Equal(t, call.Verb, job.Verb.ToRefKey()) + assert.Equal(t, call.Origin.String(), fmt.Sprintf("cron:%s", job.Key)) + assert.Equal(t, call.Request, []byte("{}")) + assert.Equal(t, call.QueueDepth, int64(len(jobsToCreate)-i)) // widdling down queue + + p, err := dal.IsCronJobPending(ctx, job.Key, job.StartTime) + assert.NoError(t, err) + assert.False(t, p) + + calls = append(calls, call) + } + clk.Add(time.Second) + expectUnscheduledJobs(t, dal, clk, 0) + // Complete all calls + for _, call := range calls { + callResult := either.LeftOf[string]([]byte("{}")) + _, err = parentDAL.CompleteAsyncCall(ctx, call, callResult, func(tx *parentdal.Tx, isFinalResult bool) error { + return nil + }) + assert.NoError(t, err) + } + clk.Add(time.Second) + expectUnscheduledJobs(t, dal, clk, 2) + // Use the completion handler to schedule the next execution + for _, call := range calls { + origin, ok := call.Origin.(parentdal.AsyncOriginCron) + assert.True(t, ok) + err = cjs.OnJobCompletion(ctx, origin.CronJobKey, false) + assert.NoError(t, err) } + expectUnscheduledJobs(t, dal, clk, 0) + for i, job := range jobsToCreate { + call, err := parentDAL.AcquireAsyncCall(ctx) + assert.NoError(t, err) + assert.Equal(t, call.Verb, job.Verb.ToRefKey()) + assert.Equal(t, call.Origin.String(), fmt.Sprintf("cron:%s", job.Key)) + assert.Equal(t, call.Request, []byte("{}")) + assert.Equal(t, call.QueueDepth, int64(len(jobsToCreate)-i)) // widdling down queue + + assert.Equal(t, call.ScheduledAt, clk.Now()) + + p, err := dal.IsCronJobPending(ctx, job.Key, job.StartTime) + assert.NoError(t, err) + assert.False(t, p) + } +} + +func expectUnscheduledJobs(t *testing.T, dal *dal.DAL, clk *clock.Mock, count int) { + t.Helper() + unscheduledJobs, err := dal.GetUnscheduledCronJobs(context.Background(), clk.Now()) + assert.NoError(t, err) + assert.Equal(t, len(unscheduledJobs), count) +} - // Check if each job has the same key list - // Theoretically this is is possible for all jobs to have the same assigned controllers, but with 100 jobs and 20 controllers, this is unlikely - keys := []string{} - hasFoundNonMatchingKeys := false - for v, k := range controllersForVerbs { - assert.Equal(t, len(k), 2, "expected verb %s to be attempted by 2 controllers", v) - - kStrs := slices.Map(k, func(k model.ControllerKey) string { return k.String() }) - xslices.Sort(kStrs) - if len(keys) == 0 { - keys = kStrs - continue - } - - if hasFoundNonMatchingKeys == false { - for keyIdx, keyStr := range kStrs { - if keys[keyIdx] != keyStr { - hasFoundNonMatchingKeys = true - } - } - } +func newCronJobs(t *testing.T, moduleName string, cronPattern string, clock clock.Clock, count int) []model.CronJob { + t.Helper() + newJobs := []model.CronJob{} + for i := range count { + now := clock.Now() + pattern, err := cron.Parse(cronPattern) + assert.NoError(t, err) + next, err := cron.NextAfter(pattern, now, false) + assert.NoError(t, err) + newJobs = append(newJobs, model.CronJob{ + Key: model.NewCronJobKey(moduleName, fmt.Sprintf("verb%dCron", i)), + Verb: schema.Ref{Module: moduleName, Name: fmt.Sprintf("verb%dCron", i)}, + Schedule: pattern.String(), + StartTime: now, + NextExecution: next, + }) } - assert.True(t, hasFoundNonMatchingKeys, "expected at least one verb to have different controllers assigned") + return newJobs } diff --git a/backend/controller/cronjobs/cronjobs_utils_test.go b/backend/controller/cronjobs/cronjobs_utils_test.go deleted file mode 100644 index 5e9d64bd59..0000000000 --- a/backend/controller/cronjobs/cronjobs_utils_test.go +++ /dev/null @@ -1,263 +0,0 @@ -package cronjobs - -import ( - "context" - "fmt" - "strconv" - "sync" - "testing" - "time" - - "connectrpc.com/connect" - "github.com/alecthomas/assert/v2" - "github.com/alecthomas/types/optional" - "github.com/benbjohnson/clock" - "github.com/jpillora/backoff" - - cronjobsdb "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal" - parentdb "github.com/TBD54566975/ftl/backend/controller/dal" - "github.com/TBD54566975/ftl/backend/controller/scheduledtask" - ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" - "github.com/TBD54566975/ftl/backend/schema" - "github.com/TBD54566975/ftl/internal/cron" - "github.com/TBD54566975/ftl/internal/model" - "github.com/TBD54566975/ftl/internal/slices" -) - -type ParentDAL interface { - CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []parentdb.DeploymentArtefact, ingressRoutes []parentdb.IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error) - ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error) -} - -type mockDAL struct { - lock sync.Mutex - clock clock.Clock - jobs []model.CronJob - attemptCountMap map[string]int -} - -var _ ParentDAL = &mockDAL{} -var _ DAL = &mockDAL{} - -func (d *mockDAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []parentdb.DeploymentArtefact, ingressRoutes []parentdb.IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error) { - deploymentKey := model.NewDeploymentKey(moduleSchema.Name) - d.jobs = []model.CronJob{} - for _, job := range cronJobs { - job.DeploymentKey = deploymentKey - d.jobs = append(d.jobs, job) - } - return deploymentKey, nil -} - -func (d *mockDAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error) { - return nil -} - -func (d *mockDAL) GetCronJobs(ctx context.Context) ([]model.CronJob, error) { - d.lock.Lock() - defer d.lock.Unlock() - - return d.jobs, nil -} - -func (d *mockDAL) indexForJob(job model.CronJob) (int, error) { - for i, j := range d.jobs { - if j.Key.String() == job.Key.String() { - return i, nil - } - } - return -1, fmt.Errorf("job not found") -} - -func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []cronjobsdb.AttemptedCronJob, err error) { - d.lock.Lock() - defer d.lock.Unlock() - - attemptedJobs = []cronjobsdb.AttemptedCronJob{} - now := d.clock.Now() - - for _, inputJob := range jobs { - i, err := d.indexForJob(inputJob) - if err != nil { - return nil, err - } - job := d.jobs[i] - if !job.NextExecution.After(now) && job.State == model.CronJobStateIdle { - job.State = model.CronJobStateExecuting - job.StartTime = d.clock.Now() - d.jobs[i] = job - attemptedJobs = append(attemptedJobs, cronjobsdb.AttemptedCronJob{ - CronJob: job, - DidStartExecution: true, - HasMinReplicas: true, - }) - } else { - attemptedJobs = append(attemptedJobs, cronjobsdb.AttemptedCronJob{ - CronJob: job, - DidStartExecution: false, - HasMinReplicas: true, - }) - } - d.attemptCountMap[job.Key.String()]++ - } - return attemptedJobs, nil -} - -func (d *mockDAL) EndCronJob(ctx context.Context, job model.CronJob, next time.Time) (model.CronJob, error) { - d.lock.Lock() - defer d.lock.Unlock() - - i, err := d.indexForJob(job) - if err != nil { - return model.CronJob{}, err - } - internalJob := d.jobs[i] - if internalJob.State != model.CronJobStateExecuting { - return model.CronJob{}, fmt.Errorf("job can not be stopped, it isnt running") - } - if internalJob.StartTime != job.StartTime { - return model.CronJob{}, fmt.Errorf("job can not be stopped, start time does not match") - } - internalJob.State = model.CronJobStateIdle - internalJob.NextExecution = next - d.jobs[i] = internalJob - return internalJob, nil -} - -func (d *mockDAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]model.CronJob, error) { - d.lock.Lock() - defer d.lock.Unlock() - - return slices.Filter(d.jobs, func(job model.CronJob) bool { - return d.clock.Now().After(job.StartTime.Add(duration)) - }), nil -} - -type mockScheduler struct { -} - -func (s *mockScheduler) Singleton(retry backoff.Backoff, job scheduledtask.Job) { - // do nothing -} - -func (s *mockScheduler) Parallel(retry backoff.Backoff, job scheduledtask.Job) { - // do nothing -} - -type controller struct { - key model.ControllerKey - dal DAL - clock clock.Clock - mockClock *clock.Mock // only set when clock is a mock - cronJobs *Service -} - -func newJobs(t *testing.T, moduleName string, cronPattern string, clock clock.Clock, count int) []model.CronJob { - t.Helper() - newJobs := []model.CronJob{} - for i := range count { - now := clock.Now() - pattern, err := cron.Parse(cronPattern) - assert.NoError(t, err) - next, err := cron.NextAfter(pattern, now, false) - assert.NoError(t, err) - newJobs = append(newJobs, model.CronJob{ - Key: model.NewCronJobKey(moduleName, fmt.Sprintf("verb%d", i)), - Verb: schema.Ref{Module: moduleName, Name: fmt.Sprintf("verb%d", i)}, - Schedule: pattern.String(), - StartTime: now, - NextExecution: next, - State: model.CronJobStateIdle, - }) - } - return newJobs -} - -func newControllers(ctx context.Context, count int, dal DAL, clockFactory func() clock.Clock, call ExecuteCallFunc) []*controller { - controllers := []*controller{} - for i := range count { - key := model.NewControllerKey("localhost", strconv.Itoa(8080+i)) - clk := clockFactory() - controller := &controller{ - key: key, - dal: dal, - clock: clk, - cronJobs: NewForTesting(ctx, - key, "test.com", - Config{Timeout: time.Minute * 5}, - dal, - &mockScheduler{}, - call, - clk), - } - if mockClock, ok := clk.(*clock.Mock); ok { - controller.mockClock = mockClock - } - controllers = append(controllers, controller) - } - - time.Sleep(time.Millisecond * 100) - - for _, c := range controllers { - s := c.cronJobs - go func() { - s.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) parentdb.Controller { - return parentdb.Controller{ - Key: ctrl.key, - } - })) - _, _ = s.syncJobs(ctx) //nolint:errcheck - }() - } - - time.Sleep(time.Millisecond * 100) - - return controllers -} - -// should be called when clk is half way between cron job executions (ie on an odd second) -func testServiceWithDal(ctx context.Context, t *testing.T, dal DAL, parentDAL ParentDAL, clk clock.Clock) { - t.Helper() - - verbCallCount := map[string]int{} - verbCallCountLock := sync.Mutex{} - - moduleName := "initial" - jobsToCreate := newJobs(t, moduleName, "*/2 * * * * * *", clk, 20) - - deploymentKey, err := parentDAL.CreateDeployment(ctx, "go", &schema.Module{ - Name: moduleName, - }, []parentdb.DeploymentArtefact{}, []parentdb.IngressRoutingEntry{}, jobsToCreate) - assert.NoError(t, err) - - err = parentDAL.ReplaceDeployment(ctx, deploymentKey, 1) - assert.NoError(t, err) - - _ = newControllers(ctx, 5, dal, func() clock.Clock { return clk }, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], p optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { - verbRef := schema.RefFromProto(r.Msg.Verb) - - verbCallCountLock.Lock() - verbCallCount[verbRef.Name]++ - verbCallCountLock.Unlock() - - return &connect.Response[ftlv1.CallResponse]{}, nil - }) - - if mockClock, ok := clk.(*clock.Mock); ok { - // We don't need to wait in real-time - time.Sleep(time.Millisecond * 100) - for range 3 { - mockClock.Add(time.Second * 2) - time.Sleep(time.Millisecond * 100) - } - } else { - time.Sleep(time.Second * 2 * 3) - } - - verbCallCountLock.Lock() - for _, j := range jobsToCreate { - count := verbCallCount[j.Verb.Name] - assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Verb.Name) - } - verbCallCountLock.Unlock() -} diff --git a/backend/controller/cronjobs/dal/dal.go b/backend/controller/cronjobs/dal/dal.go index 9499717fe6..e75d278155 100644 --- a/backend/controller/cronjobs/dal/dal.go +++ b/backend/controller/cronjobs/dal/dal.go @@ -1,4 +1,3 @@ -// Package dal provides a data abstraction layer for cron jobs package dal import ( @@ -6,8 +5,10 @@ import ( "fmt" "time" + "github.com/alecthomas/types/optional" + "github.com/TBD54566975/ftl/backend/controller/cronjobs/sql" - "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" + "github.com/TBD54566975/ftl/backend/controller/observability" dalerrs "github.com/TBD54566975/ftl/backend/dal" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/model" @@ -22,80 +23,114 @@ func New(conn sql.ConnI) *DAL { return &DAL{db: sql.NewDB(conn)} } -func cronJobFromRow(row sql.GetCronJobsRow) model.CronJob { - return model.CronJob{ - Key: row.Key, - DeploymentKey: row.DeploymentKey, - Verb: schema.Ref{Module: row.Module, Name: row.Verb}, - Schedule: row.Schedule, - StartTime: row.StartTime, - NextExecution: row.NextExecution, - State: row.State, - } +type Tx struct { + *DAL } -// GetCronJobs returns all cron jobs for deployments with min replicas > 0 -func (d *DAL) GetCronJobs(ctx context.Context) ([]model.CronJob, error) { - rows, err := d.db.GetCronJobs(ctx) +func (d *DAL) Begin(ctx context.Context) (*Tx, error) { + tx, err := d.db.Begin(ctx) if err != nil { - return nil, fmt.Errorf("failed to get cron jobs: %w", dalerrs.TranslatePGError(err)) + return nil, fmt.Errorf("failed to begin transaction: %w", dalerrs.TranslatePGError(err)) } - return slices.Map(rows, cronJobFromRow), nil + return &Tx{DAL: &DAL{db: tx}}, nil } -type AttemptedCronJob struct { - DidStartExecution bool - HasMinReplicas bool - model.CronJob +func (t *Tx) CommitOrRollback(ctx context.Context, err *error) { + tx, ok := t.db.(*sql.Tx) + if !ok { + panic("inconceivable") + } + tx.CommitOrRollback(ctx, err) } -// StartCronJobs returns a full list of results so that the caller can update their list of jobs whether or not they successfully updated the row -func (d *DAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []AttemptedCronJob, err error) { - if len(jobs) == 0 { - return nil, nil +func (t *Tx) Commit(ctx context.Context) error { + tx, ok := t.db.(*sql.Tx) + if !ok { + panic("inconcievable") } - rows, err := d.db.StartCronJobs(ctx, slices.Map(jobs, func(job model.CronJob) string { return job.Key.String() })) + err := tx.Commit(ctx) if err != nil { - return nil, fmt.Errorf("failed to start cron jobs: %w", dalerrs.TranslatePGError(err)) + return fmt.Errorf("failed to commit transaction: %w", dalerrs.TranslatePGError(err)) } + return nil +} - attemptedJobs = []AttemptedCronJob{} - for _, row := range rows { - job := AttemptedCronJob{ - CronJob: model.CronJob{ - Key: row.Key, - DeploymentKey: row.DeploymentKey, - Verb: schema.Ref{Module: row.Module, Name: row.Verb}, - Schedule: row.Schedule, - StartTime: row.StartTime, - NextExecution: row.NextExecution, - State: row.State, - }, - DidStartExecution: row.Updated, - HasMinReplicas: row.HasMinReplicas, - } - attemptedJobs = append(attemptedJobs, job) +func (t *Tx) Rollback(ctx context.Context) error { + tx, ok := t.db.(*sql.Tx) + if !ok { + panic("inconcievable") + } + err := tx.Rollback(ctx) + if err != nil { + return fmt.Errorf("failed to rollback transaction: %w", dalerrs.TranslatePGError(err)) + } + return nil +} + +func cronJobFromRow(c sql.CronJob, d sql.Deployment) model.CronJob { + return model.CronJob{ + Key: c.Key, + DeploymentKey: d.Key, + Verb: schema.Ref{Module: c.ModuleName, Name: c.Verb}, + Schedule: c.Schedule, + StartTime: c.StartTime, + NextExecution: c.NextExecution, + LastExecution: c.LastExecution, } - return attemptedJobs, nil } -// EndCronJob sets the status from executing to idle and updates the next execution time -// Can be called on the successful completion of a job, or if the job failed to execute (error or timeout) -func (d *DAL) EndCronJob(ctx context.Context, job model.CronJob, next time.Time) (model.CronJob, error) { - row, err := d.db.EndCronJob(ctx, next, job.Key, job.StartTime) +// CreateAsyncCall creates an async_call row and returns its id +func (d *DAL) CreateAsyncCall(ctx context.Context, params sql.CreateAsyncCallParams) (int64, error) { + id, err := d.db.CreateAsyncCall(ctx, params) if err != nil { - return model.CronJob{}, fmt.Errorf("failed to end cron job: %w", dalerrs.TranslatePGError(err)) + return 0, fmt.Errorf("failed to create async call: %w", dalerrs.TranslatePGError(err)) + } + observability.AsyncCalls.Created(ctx, params.Verb, optional.None[schema.RefKey](), params.Origin, 0, err) + queueDepth, err := d.db.AsyncCallQueueDepth(ctx) + if err == nil { + // Don't error out of an FSM transition just over a queue depth retrieval + // error because this is only used for an observability gauge. + observability.AsyncCalls.RecordQueueDepth(ctx, queueDepth) } - return cronJobFromRow(sql.GetCronJobsRow(row)), nil + return id, nil } -// GetStaleCronJobs returns a list of cron jobs that have been executing longer than the duration -func (d *DAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]model.CronJob, error) { - rows, err := d.db.GetStaleCronJobs(ctx, sqltypes.Duration(duration)) +// GetUnscheduledCronJobs returns all cron_jobs rows with start_time before provided startTime for +// deployments with min replicas > 0 with no pending corresponding async_calls after last_execution +func (d *DAL) GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]model.CronJob, error) { + rows, err := d.db.GetUnscheduledCronJobs(ctx, startTime) if err != nil { - return nil, fmt.Errorf("failed to get stale cron jobs: %w", dalerrs.TranslatePGError(err)) + return nil, fmt.Errorf("failed to get cron jobs: %w", dalerrs.TranslatePGError(err)) } - return slices.Map(rows, func(row sql.GetStaleCronJobsRow) model.CronJob { - return cronJobFromRow(sql.GetCronJobsRow(row)) + return slices.Map(rows, func(r sql.GetUnscheduledCronJobsRow) model.CronJob { + return cronJobFromRow(r.CronJob, r.Deployment) }), nil } + +// GetCronJobByKey returns a cron_job row by its key +func (d *DAL) GetCronJobByKey(ctx context.Context, key model.CronJobKey) (model.CronJob, error) { + row, err := d.db.GetCronJobByKey(ctx, key) + if err != nil { + return model.CronJob{}, fmt.Errorf("failed to get cron job %q: %w", key, dalerrs.TranslatePGError(err)) + } + return cronJobFromRow(row.CronJob, row.Deployment), nil +} + +// IsCronJobPending returns whether this cron job is executing or scheduled in async_calls +func (d *DAL) IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error) { + pending, err := d.db.IsCronJobPending(ctx, key, startTime) + if err != nil { + return false, fmt.Errorf("failed to check if cron job %q is pending: %w", key, dalerrs.TranslatePGError(err)) + } + return pending, nil +} + +// UpdateCronJobExecution updates the last_async_call_id, last_execution, and next_execution of +// the cron job given by the provided key +func (d *DAL) UpdateCronJobExecution(ctx context.Context, params sql.UpdateCronJobExecutionParams) error { + err := d.db.UpdateCronJobExecution(ctx, params) + if err != nil { + return fmt.Errorf("failed to update cron job %q: %w", params.Key, dalerrs.TranslatePGError(err)) + } + return nil +} diff --git a/backend/controller/cronjobs/sql/async_queries.sql.go b/backend/controller/cronjobs/sql/async_queries.sql.go new file mode 100644 index 0000000000..181199435a --- /dev/null +++ b/backend/controller/cronjobs/sql/async_queries.sql.go @@ -0,0 +1,89 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 +// source: async_queries.sql + +package sql + +import ( + "context" + "encoding/json" + "time" + + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/encryption" + "github.com/alecthomas/types/optional" +) + +const asyncCallQueueDepth = `-- name: AsyncCallQueueDepth :one +SELECT count(*) +FROM async_calls +WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc') +` + +func (q *Queries) AsyncCallQueueDepth(ctx context.Context) (int64, error) { + row := q.db.QueryRowContext(ctx, asyncCallQueueDepth) + var count int64 + err := row.Scan(&count) + return count, err +} + +const createAsyncCall = `-- name: CreateAsyncCall :one +INSERT INTO async_calls ( + scheduled_at, + verb, + origin, + request, + remaining_attempts, + backoff, + max_backoff, + catch_verb, + parent_request_key, + trace_context +) +VALUES ( + $1::TIMESTAMPTZ, + $2, + $3, + $4, + $5, + $6::interval, + $7::interval, + $8, + $9, + $10::jsonb +) +RETURNING id +` + +type CreateAsyncCallParams struct { + ScheduledAt time.Time + Verb schema.RefKey + Origin string + Request encryption.EncryptedAsyncColumn + RemainingAttempts int32 + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration + CatchVerb optional.Option[schema.RefKey] + ParentRequestKey optional.Option[string] + TraceContext json.RawMessage +} + +func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) { + row := q.db.QueryRowContext(ctx, createAsyncCall, + arg.ScheduledAt, + arg.Verb, + arg.Origin, + arg.Request, + arg.RemainingAttempts, + arg.Backoff, + arg.MaxBackoff, + arg.CatchVerb, + arg.ParentRequestKey, + arg.TraceContext, + ) + var id int64 + err := row.Scan(&id) + return id, err +} diff --git a/backend/controller/cronjobs/sql/conn.go b/backend/controller/cronjobs/sql/conn.go index 065487cefa..62699e9ef6 100644 --- a/backend/controller/cronjobs/sql/conn.go +++ b/backend/controller/cronjobs/sql/conn.go @@ -1,12 +1,21 @@ package sql +import ( + "context" + "database/sql" + "errors" + "fmt" +) + type DBI interface { Querier Conn() ConnI + Begin(ctx context.Context) (*Tx, error) } type ConnI interface { DBTX + Begin() (*sql.Tx, error) } type DB struct { @@ -19,3 +28,67 @@ func NewDB(conn ConnI) *DB { } func (d *DB) Conn() ConnI { return d.conn } + +func (d *DB) Begin(ctx context.Context) (*Tx, error) { + tx, err := d.conn.Begin() + if err != nil { + return nil, fmt.Errorf("beginning transaction: %w", err) + } + return &Tx{tx: tx, Queries: New(tx)}, nil +} + +type noopSubConn struct { + DBTX +} + +func (noopSubConn) Begin() (*sql.Tx, error) { + return nil, errors.New("sql: not implemented") +} + +type Tx struct { + tx *sql.Tx + *Queries +} + +func (t *Tx) Conn() ConnI { return noopSubConn{t.tx} } + +func (t *Tx) Tx() *sql.Tx { return t.tx } + +func (t *Tx) Begin(ctx context.Context) (*Tx, error) { + return nil, fmt.Errorf("cannot nest transactions") +} + +func (t *Tx) Commit(ctx context.Context) error { + err := t.tx.Commit() + if err != nil { + return fmt.Errorf("committing transaction: %w", err) + } + + return nil +} + +func (t *Tx) Rollback(ctx context.Context) error { + err := t.tx.Rollback() + if err != nil { + return fmt.Errorf("rolling back transaction: %w", err) + } + + return nil +} + +// CommitOrRollback can be used in a defer statement to commit or rollback a +// transaction depending on whether the enclosing function returned an error. +// +// func myFunc() (err error) { +// tx, err := db.Begin(ctx) +// if err != nil { return err } +// defer tx.CommitOrRollback(ctx, &err) +// ... +// } +func (t *Tx) CommitOrRollback(ctx context.Context, err *error) { + if *err != nil { + *err = errors.Join(*err, t.Rollback(ctx)) + } else { + *err = t.Commit(ctx) + } +} diff --git a/backend/controller/cronjobs/sql/models.go b/backend/controller/cronjobs/sql/models.go index c8b93b436c..039df94a07 100644 --- a/backend/controller/cronjobs/sql/models.go +++ b/backend/controller/cronjobs/sql/models.go @@ -106,48 +106,6 @@ func (ns NullControllerState) Value() (driver.Value, error) { return string(ns.ControllerState), nil } -type CronJobState string - -const ( - CronJobStateIdle CronJobState = "idle" - CronJobStateExecuting CronJobState = "executing" -) - -func (e *CronJobState) Scan(src interface{}) error { - switch s := src.(type) { - case []byte: - *e = CronJobState(s) - case string: - *e = CronJobState(s) - default: - return fmt.Errorf("unsupported scan type for CronJobState: %T", src) - } - return nil -} - -type NullCronJobState struct { - CronJobState CronJobState - Valid bool // Valid is true if CronJobState is not NULL -} - -// Scan implements the Scanner interface. -func (ns *NullCronJobState) Scan(value interface{}) error { - if value == nil { - ns.CronJobState, ns.Valid = "", false - return nil - } - ns.Valid = true - return ns.CronJobState.Scan(value) -} - -// Value implements the driver Valuer interface. -func (ns NullCronJobState) Value() (driver.Value, error) { - if !ns.Valid { - return nil, nil - } - return string(ns.CronJobState), nil -} - type EventType string const ( @@ -401,15 +359,16 @@ type Controller struct { } type CronJob struct { - ID int64 - Key model.CronJobKey - DeploymentID int64 - Verb string - Schedule string - StartTime time.Time - NextExecution time.Time - State model.CronJobState - ModuleName string + ID int64 + Key model.CronJobKey + DeploymentID int64 + Verb string + Schedule string + StartTime time.Time + NextExecution time.Time + ModuleName string + LastExecution optional.Option[time.Time] + LastAsyncCallID optional.Option[int64] } type Deployment struct { diff --git a/backend/controller/cronjobs/sql/querier.go b/backend/controller/cronjobs/sql/querier.go index 2bfa010982..05c77d7f4c 100644 --- a/backend/controller/cronjobs/sql/querier.go +++ b/backend/controller/cronjobs/sql/querier.go @@ -8,16 +8,17 @@ import ( "context" "time" - "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/internal/model" ) type Querier interface { + AsyncCallQueueDepth(ctx context.Context) (int64, error) + CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) CreateCronJob(ctx context.Context, arg CreateCronJobParams) error - EndCronJob(ctx context.Context, nextExecution time.Time, key model.CronJobKey, startTime time.Time) (EndCronJobRow, error) - GetCronJobs(ctx context.Context) ([]GetCronJobsRow, error) - GetStaleCronJobs(ctx context.Context, dollar_1 sqltypes.Duration) ([]GetStaleCronJobsRow, error) - StartCronJobs(ctx context.Context, keys []string) ([]StartCronJobsRow, error) + GetCronJobByKey(ctx context.Context, key model.CronJobKey) (GetCronJobByKeyRow, error) + GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]GetUnscheduledCronJobsRow, error) + IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error) + UpdateCronJobExecution(ctx context.Context, arg UpdateCronJobExecutionParams) error } var _ Querier = (*Queries)(nil) diff --git a/backend/controller/cronjobs/sql/queries.sql b/backend/controller/cronjobs/sql/queries.sql index b16589ff2c..a3a3ec627a 100644 --- a/backend/controller/cronjobs/sql/queries.sql +++ b/backend/controller/cronjobs/sql/queries.sql @@ -1,8 +1,26 @@ --- name: GetCronJobs :many -SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state +-- name: GetUnscheduledCronJobs :many +SELECT sqlc.embed(j), sqlc.embed(d) FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id -WHERE d.min_replicas > 0; +WHERE d.min_replicas > 0 + AND j.start_time < sqlc.arg('start_time')::TIMESTAMPTZ + AND ( + j.last_async_call_id IS NULL + OR NOT EXISTS ( + SELECT 1 + FROM async_calls ac + WHERE ac.id = j.last_async_call_id + AND ac.state IN ('pending', 'executing') + ) + ) +FOR UPDATE SKIP LOCKED; + +-- name: GetCronJobByKey :one +SELECT sqlc.embed(j), sqlc.embed(d) +FROM cron_jobs j + INNER JOIN deployments d on j.deployment_id = d.id +WHERE j.key = sqlc.arg('key')::cron_job_key +FOR UPDATE SKIP LOCKED; -- name: CreateCronJob :exec INSERT INTO cron_jobs (key, deployment_id, module_name, verb, schedule, start_time, next_execution) @@ -15,45 +33,19 @@ INSERT INTO cron_jobs (key, deployment_id, module_name, verb, schedule, start_ti sqlc.arg('start_time')::TIMESTAMPTZ, sqlc.arg('next_execution')::TIMESTAMPTZ); --- name: StartCronJobs :many -WITH updates AS ( - UPDATE cron_jobs - SET state = 'executing', - start_time = (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ - WHERE key = ANY (sqlc.arg('keys')) - AND state = 'idle' - AND start_time < next_execution - AND (next_execution AT TIME ZONE 'utc') < (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ - RETURNING id, key, state, start_time, next_execution) -SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, - COALESCE(u.start_time, j.start_time) as start_time, - COALESCE(u.next_execution, j.next_execution) as next_execution, - COALESCE(u.state, j.state) as state, - d.min_replicas > 0 as has_min_replicas, - CASE WHEN u.key IS NULL THEN FALSE ELSE TRUE END as updated -FROM cron_jobs j - INNER JOIN deployments d on j.deployment_id = d.id - LEFT JOIN updates u on j.id = u.id -WHERE j.key = ANY (sqlc.arg('keys')); - --- name: EndCronJob :one -WITH j AS ( +-- name: UpdateCronJobExecution :exec UPDATE cron_jobs - SET state = 'idle', + SET last_async_call_id = sqlc.arg('last_async_call_id')::BIGINT, + last_execution = sqlc.arg('last_execution')::TIMESTAMPTZ, next_execution = sqlc.arg('next_execution')::TIMESTAMPTZ - WHERE key = sqlc.arg('key')::cron_job_key - AND state = 'executing' - AND start_time = sqlc.arg('start_time')::TIMESTAMPTZ - RETURNING * -) -SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state - FROM j - INNER JOIN deployments d on j.deployment_id = d.id - LIMIT 1; + WHERE key = sqlc.arg('key')::cron_job_key; --- name: GetStaleCronJobs :many -SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state -FROM cron_jobs j - INNER JOIN deployments d on j.deployment_id = d.id -WHERE state = 'executing' - AND start_time < (NOW() AT TIME ZONE 'utc') - $1::INTERVAL; +-- name: IsCronJobPending :one +SELECT EXISTS ( + SELECT 1 + FROM cron_jobs j + INNER JOIN async_calls ac on j.last_async_call_id = ac.id + WHERE j.key = sqlc.arg('key')::cron_job_key + AND ac.scheduled_at > sqlc.arg('start_time')::TIMESTAMPTZ + AND ac.state = 'pending' +) AS pending; diff --git a/backend/controller/cronjobs/sql/queries.sql.go b/backend/controller/cronjobs/sql/queries.sql.go index 1064924f55..af6e4634b2 100644 --- a/backend/controller/cronjobs/sql/queries.sql.go +++ b/backend/controller/cronjobs/sql/queries.sql.go @@ -9,9 +9,7 @@ import ( "context" "time" - "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/internal/model" - "github.com/lib/pq" ) const createCronJob = `-- name: CreateCronJob :exec @@ -49,85 +47,94 @@ func (q *Queries) CreateCronJob(ctx context.Context, arg CreateCronJobParams) er return err } -const endCronJob = `-- name: EndCronJob :one -WITH j AS ( -UPDATE cron_jobs - SET state = 'idle', - next_execution = $1::TIMESTAMPTZ - WHERE key = $2::cron_job_key - AND state = 'executing' - AND start_time = $3::TIMESTAMPTZ - RETURNING id, key, deployment_id, verb, schedule, start_time, next_execution, state, module_name -) -SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state - FROM j +const getCronJobByKey = `-- name: GetCronJobByKey :one +SELECT j.id, j.key, j.deployment_id, j.verb, j.schedule, j.start_time, j.next_execution, j.module_name, j.last_execution, j.last_async_call_id, d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas +FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id - LIMIT 1 +WHERE j.key = $1::cron_job_key +FOR UPDATE SKIP LOCKED ` -type EndCronJobRow struct { - Key model.CronJobKey - DeploymentKey model.DeploymentKey - Module string - Verb string - Schedule string - StartTime time.Time - NextExecution time.Time - State model.CronJobState +type GetCronJobByKeyRow struct { + CronJob CronJob + Deployment Deployment } -func (q *Queries) EndCronJob(ctx context.Context, nextExecution time.Time, key model.CronJobKey, startTime time.Time) (EndCronJobRow, error) { - row := q.db.QueryRowContext(ctx, endCronJob, nextExecution, key, startTime) - var i EndCronJobRow +func (q *Queries) GetCronJobByKey(ctx context.Context, key model.CronJobKey) (GetCronJobByKeyRow, error) { + row := q.db.QueryRowContext(ctx, getCronJobByKey, key) + var i GetCronJobByKeyRow err := row.Scan( - &i.Key, - &i.DeploymentKey, - &i.Module, - &i.Verb, - &i.Schedule, - &i.StartTime, - &i.NextExecution, - &i.State, + &i.CronJob.ID, + &i.CronJob.Key, + &i.CronJob.DeploymentID, + &i.CronJob.Verb, + &i.CronJob.Schedule, + &i.CronJob.StartTime, + &i.CronJob.NextExecution, + &i.CronJob.ModuleName, + &i.CronJob.LastExecution, + &i.CronJob.LastAsyncCallID, + &i.Deployment.ID, + &i.Deployment.CreatedAt, + &i.Deployment.ModuleID, + &i.Deployment.Key, + &i.Deployment.Schema, + &i.Deployment.Labels, + &i.Deployment.MinReplicas, ) return i, err } -const getCronJobs = `-- name: GetCronJobs :many -SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state +const getUnscheduledCronJobs = `-- name: GetUnscheduledCronJobs :many +SELECT j.id, j.key, j.deployment_id, j.verb, j.schedule, j.start_time, j.next_execution, j.module_name, j.last_execution, j.last_async_call_id, d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id WHERE d.min_replicas > 0 + AND j.start_time < $1::TIMESTAMPTZ + AND ( + j.last_async_call_id IS NULL + OR NOT EXISTS ( + SELECT 1 + FROM async_calls ac + WHERE ac.id = j.last_async_call_id + AND ac.state IN ('pending', 'executing') + ) + ) +FOR UPDATE SKIP LOCKED ` -type GetCronJobsRow struct { - Key model.CronJobKey - DeploymentKey model.DeploymentKey - Module string - Verb string - Schedule string - StartTime time.Time - NextExecution time.Time - State model.CronJobState +type GetUnscheduledCronJobsRow struct { + CronJob CronJob + Deployment Deployment } -func (q *Queries) GetCronJobs(ctx context.Context) ([]GetCronJobsRow, error) { - rows, err := q.db.QueryContext(ctx, getCronJobs) +func (q *Queries) GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]GetUnscheduledCronJobsRow, error) { + rows, err := q.db.QueryContext(ctx, getUnscheduledCronJobs, startTime) if err != nil { return nil, err } defer rows.Close() - var items []GetCronJobsRow + var items []GetUnscheduledCronJobsRow for rows.Next() { - var i GetCronJobsRow + var i GetUnscheduledCronJobsRow if err := rows.Scan( - &i.Key, - &i.DeploymentKey, - &i.Module, - &i.Verb, - &i.Schedule, - &i.StartTime, - &i.NextExecution, - &i.State, + &i.CronJob.ID, + &i.CronJob.Key, + &i.CronJob.DeploymentID, + &i.CronJob.Verb, + &i.CronJob.Schedule, + &i.CronJob.StartTime, + &i.CronJob.NextExecution, + &i.CronJob.ModuleName, + &i.CronJob.LastExecution, + &i.CronJob.LastAsyncCallID, + &i.Deployment.ID, + &i.Deployment.CreatedAt, + &i.Deployment.ModuleID, + &i.Deployment.Key, + &i.Deployment.Schema, + &i.Deployment.Labels, + &i.Deployment.MinReplicas, ); err != nil { return nil, err } @@ -142,122 +149,45 @@ func (q *Queries) GetCronJobs(ctx context.Context) ([]GetCronJobsRow, error) { return items, nil } -const getStaleCronJobs = `-- name: GetStaleCronJobs :many -SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state -FROM cron_jobs j - INNER JOIN deployments d on j.deployment_id = d.id -WHERE state = 'executing' - AND start_time < (NOW() AT TIME ZONE 'utc') - $1::INTERVAL +const isCronJobPending = `-- name: IsCronJobPending :one +SELECT EXISTS ( + SELECT 1 + FROM cron_jobs j + INNER JOIN async_calls ac on j.last_async_call_id = ac.id + WHERE j.key = $1::cron_job_key + AND ac.scheduled_at > $2::TIMESTAMPTZ + AND ac.state = 'pending' +) AS pending ` -type GetStaleCronJobsRow struct { - Key model.CronJobKey - DeploymentKey model.DeploymentKey - Module string - Verb string - Schedule string - StartTime time.Time - NextExecution time.Time - State model.CronJobState -} - -func (q *Queries) GetStaleCronJobs(ctx context.Context, dollar_1 sqltypes.Duration) ([]GetStaleCronJobsRow, error) { - rows, err := q.db.QueryContext(ctx, getStaleCronJobs, dollar_1) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetStaleCronJobsRow - for rows.Next() { - var i GetStaleCronJobsRow - if err := rows.Scan( - &i.Key, - &i.DeploymentKey, - &i.Module, - &i.Verb, - &i.Schedule, - &i.StartTime, - &i.NextExecution, - &i.State, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil +func (q *Queries) IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error) { + row := q.db.QueryRowContext(ctx, isCronJobPending, key, startTime) + var pending bool + err := row.Scan(&pending) + return pending, err } -const startCronJobs = `-- name: StartCronJobs :many -WITH updates AS ( - UPDATE cron_jobs - SET state = 'executing', - start_time = (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ - WHERE key = ANY ($1) - AND state = 'idle' - AND start_time < next_execution - AND (next_execution AT TIME ZONE 'utc') < (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ - RETURNING id, key, state, start_time, next_execution) -SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, - COALESCE(u.start_time, j.start_time) as start_time, - COALESCE(u.next_execution, j.next_execution) as next_execution, - COALESCE(u.state, j.state) as state, - d.min_replicas > 0 as has_min_replicas, - CASE WHEN u.key IS NULL THEN FALSE ELSE TRUE END as updated -FROM cron_jobs j - INNER JOIN deployments d on j.deployment_id = d.id - LEFT JOIN updates u on j.id = u.id -WHERE j.key = ANY ($1) +const updateCronJobExecution = `-- name: UpdateCronJobExecution :exec +UPDATE cron_jobs + SET last_async_call_id = $1::BIGINT, + last_execution = $2::TIMESTAMPTZ, + next_execution = $3::TIMESTAMPTZ + WHERE key = $4::cron_job_key ` -type StartCronJobsRow struct { - Key model.CronJobKey - DeploymentKey model.DeploymentKey - Module string - Verb string - Schedule string - StartTime time.Time - NextExecution time.Time - State model.CronJobState - HasMinReplicas bool - Updated bool +type UpdateCronJobExecutionParams struct { + LastAsyncCallID int64 + LastExecution time.Time + NextExecution time.Time + Key model.CronJobKey } -func (q *Queries) StartCronJobs(ctx context.Context, keys []string) ([]StartCronJobsRow, error) { - rows, err := q.db.QueryContext(ctx, startCronJobs, pq.Array(keys)) - if err != nil { - return nil, err - } - defer rows.Close() - var items []StartCronJobsRow - for rows.Next() { - var i StartCronJobsRow - if err := rows.Scan( - &i.Key, - &i.DeploymentKey, - &i.Module, - &i.Verb, - &i.Schedule, - &i.StartTime, - &i.NextExecution, - &i.State, - &i.HasMinReplicas, - &i.Updated, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil +func (q *Queries) UpdateCronJobExecution(ctx context.Context, arg UpdateCronJobExecutionParams) error { + _, err := q.db.ExecContext(ctx, updateCronJobExecution, + arg.LastAsyncCallID, + arg.LastExecution, + arg.NextExecution, + arg.Key, + ) + return err } diff --git a/backend/controller/cronjobs/state.go b/backend/controller/cronjobs/state.go deleted file mode 100644 index cda09241d1..0000000000 --- a/backend/controller/cronjobs/state.go +++ /dev/null @@ -1,83 +0,0 @@ -package cronjobs - -import ( - "time" - - "github.com/alecthomas/types/optional" - - "github.com/TBD54566975/ftl/internal/model" - "github.com/TBD54566975/ftl/internal/slices" -) - -// state models the state of the cron job service's private state for scheduling jobs and reacting to events -type state struct { - jobs []model.CronJob - - // Used to determine if this controller is currently executing a job - executing map[string]bool - - // Newly created jobs should be attempted by the controller that created them until other controllers - // have a chance to resync their job lists and share responsibilities through the hash ring - newJobs map[string]time.Time - - // We delay any job attempts in case of db errors to avoid hammering the db in a tight loop - blockedUntil time.Time -} - -func (s *state) isExecutingInCurrentController(job model.CronJob) bool { - return s.executing[job.Key.String()] -} - -func (s *state) startedExecutingJob(job model.CronJob) { - s.executing[job.Key.String()] = true -} - -func (s *state) isJobTooNewForHashRing(job model.CronJob) bool { - if t, ok := s.newJobs[job.Key.String()]; ok { - if time.Since(t) < newJobHashRingOverrideInterval { - return true - } - delete(s.newJobs, job.Key.String()) - } - return false -} - -func (s *state) sync(jobs []model.CronJob, newDeploymentKey optional.Option[model.DeploymentKey]) { - s.jobs = make([]model.CronJob, len(jobs)) - copy(s.jobs, jobs) - for _, job := range s.jobs { - if job.State != model.CronJobStateExecuting { - delete(s.executing, job.Key.String()) - } - if newKey, ok := newDeploymentKey.Get(); ok && job.DeploymentKey.String() == newKey.String() { - // This job is new and should be attempted by the current controller - s.newJobs[job.Key.String()] = time.Now() - } - } -} - -func (s *state) updateJobs(jobs []model.CronJob) { - updatedJobMap := jobMap(jobs) - for idx, old := range s.jobs { - if updated, exists := updatedJobMap[old.Key.String()]; exists { - s.jobs[idx] = updated - if updated.State != model.CronJobStateExecuting { - delete(s.executing, updated.Key.String()) - } - } - } -} - -func (s *state) removeDeploymentKey(key model.DeploymentKey) { - s.jobs = slices.Filter(s.jobs, func(j model.CronJob) bool { - return j.DeploymentKey.String() != key.String() - }) -} - -func jobMap(jobs []model.CronJob) map[string]model.CronJob { - m := map[string]model.CronJob{} - for _, job := range jobs { - m[job.Key.String()] = job - } - return m -} diff --git a/backend/controller/dal/async_calls.go b/backend/controller/dal/async_calls.go index 6358c90283..5c621ffeb0 100644 --- a/backend/controller/dal/async_calls.go +++ b/backend/controller/dal/async_calls.go @@ -7,6 +7,7 @@ import ( "time" "github.com/alecthomas/participle/v2" + "github.com/alecthomas/participle/v2/lexer" "github.com/alecthomas/types/either" "github.com/alecthomas/types/optional" @@ -15,14 +16,22 @@ import ( dalerrs "github.com/TBD54566975/ftl/backend/dal" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/encryption" + "github.com/TBD54566975/ftl/internal/model" ) type asyncOriginParseRoot struct { Key AsyncOrigin `parser:"@@"` } +var asyncOriginLexer = lexer.MustSimple([]lexer.SimpleRule{ + {"NumberIdent", `[0-9][a-zA-Z0-9_-]*`}, + {"Ident", `[a-zA-Z_][a-zA-Z0-9_-]*`}, + {"Punct", `[:.]`}, +}) + var asyncOriginParser = participle.MustBuild[asyncOriginParseRoot]( - participle.Union[AsyncOrigin](AsyncOriginFSM{}, AsyncOriginPubSub{}), + participle.Union[AsyncOrigin](AsyncOriginCron{}, AsyncOriginFSM{}, AsyncOriginPubSub{}), + participle.Lexer(asyncOriginLexer), ) // AsyncOrigin is a sum type representing the originator of an async call. @@ -35,6 +44,19 @@ type AsyncOrigin interface { String() string } +// AsyncOriginCron represents the context for the originator of a cron async call. +// +// It is in the form cron:. +type AsyncOriginCron struct { + CronJobKey model.CronJobKey `parser:"'cron' ':' @(~EOF)+"` +} + +var _ AsyncOrigin = AsyncOriginCron{} + +func (AsyncOriginCron) asyncOrigin() {} +func (a AsyncOriginCron) Origin() string { return "cron" } +func (a AsyncOriginCron) String() string { return fmt.Sprintf("cron:%s", a.CronJobKey) } + // AsyncOriginFSM represents the context for the originator of an FSM async call. // // It is in the form fsm:.: diff --git a/backend/controller/dal/async_calls_test.go b/backend/controller/dal/async_calls_test.go index c1ea520cc8..b154e8da0b 100644 --- a/backend/controller/dal/async_calls_test.go +++ b/backend/controller/dal/async_calls_test.go @@ -8,8 +8,10 @@ import ( "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" dalerrs "github.com/TBD54566975/ftl/backend/dal" + "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/encryption" "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" ) func TestNoCallToAcquire(t *testing.T) { @@ -22,3 +24,28 @@ func TestNoCallToAcquire(t *testing.T) { assert.IsError(t, err, dalerrs.ErrNotFound) assert.EqualError(t, err, "no pending async calls: not found") } + +func TestParser(t *testing.T) { + t.Parallel() + tests := []struct { + name string + input string + expected AsyncOrigin + }{ + {"Cron", `cron:crn-initial-verb0Cron-5eq2ivpmuv0lvnoc`, AsyncOriginCron{ + CronJobKey: model.CronJobKey{ + Payload: model.CronJobPayload{Module: "initial", Verb: "verb0Cron"}, + Suffix: []byte("\xfd7\xe6*\xfcƹ\xe9.\x9c"), + }}}, + {"FSM", `fsm:module.name:key`, AsyncOriginFSM{FSM: schema.RefKey{Module: "module", Name: "name"}, Key: "key"}}, + {"PubSub", `sub:module.topic`, AsyncOriginPubSub{Subscription: schema.RefKey{Module: "module", Name: "topic"}}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + origin, err := asyncOriginParser.ParseString("", tt.input) + assert.NoError(t, err) + assert.Equal(t, tt.expected, origin.Key) + }) + } +} diff --git a/backend/controller/dal/fsm.go b/backend/controller/dal/fsm.go index af8e9bf008..8b694f8b48 100644 --- a/backend/controller/dal/fsm.go +++ b/backend/controller/dal/fsm.go @@ -45,6 +45,7 @@ func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, instanc // Create an async call for the event. origin := AsyncOriginFSM{FSM: fsm, Key: instanceKey} asyncCallID, err := d.db.CreateAsyncCall(ctx, sql.CreateAsyncCallParams{ + ScheduledAt: time.Now(), Verb: destinationState, Origin: origin.String(), Request: encryptedRequest, diff --git a/backend/controller/dal/pubsub.go b/backend/controller/dal/pubsub.go index 795201353b..924ef17834 100644 --- a/backend/controller/dal/pubsub.go +++ b/backend/controller/dal/pubsub.go @@ -128,6 +128,7 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t } _, err = tx.db.CreateAsyncCall(ctx, sql.CreateAsyncCallParams{ + ScheduledAt: time.Now(), Verb: subscriber.Sink, Origin: origin.String(), Request: nextCursor.Payload, // already encrypted diff --git a/backend/controller/observability/async_calls.go b/backend/controller/observability/async_calls.go index 471cc557c4..e37e51fc73 100644 --- a/backend/controller/observability/async_calls.go +++ b/backend/controller/observability/async_calls.go @@ -107,6 +107,11 @@ func (m *AsyncCallMetrics) Acquired(ctx context.Context, verb schema.RefKey, cat m.acquired.Add(ctx, 1, metric.WithAttributes(attrs...)) } +// AcquireFailed should be called if an acquisition failed before any call data could be retrieved. +func (m *AsyncCallMetrics) AcquireFailed(ctx context.Context, err error) { + m.acquired.Add(ctx, 1, metric.WithAttributes(observability.SuccessOrFailureStatusAttr(false))) +} + func (m *AsyncCallMetrics) Executed(ctx context.Context, verb schema.RefKey, catchVerb optional.Option[schema.RefKey], origin string, scheduledAt time.Time, isCatching bool, maybeFailureMode optional.Option[string]) { attrs := extractAsyncCallAttrs(verb, catchVerb, origin, scheduledAt, isCatching) diff --git a/backend/controller/sql/async_queries.sql b/backend/controller/sql/async_queries.sql new file mode 100644 index 0000000000..5f2f3e2000 --- /dev/null +++ b/backend/controller/sql/async_queries.sql @@ -0,0 +1,31 @@ +-- name: CreateAsyncCall :one +INSERT INTO async_calls ( + scheduled_at, + verb, + origin, + request, + remaining_attempts, + backoff, + max_backoff, + catch_verb, + parent_request_key, + trace_context +) +VALUES ( + @scheduled_at::TIMESTAMPTZ, + @verb, + @origin, + @request, + @remaining_attempts, + @backoff::interval, + @max_backoff::interval, + @catch_verb, + @parent_request_key, + @trace_context::jsonb +) +RETURNING id; + +-- name: AsyncCallQueueDepth :one +SELECT count(*) +FROM async_calls +WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc'); diff --git a/backend/controller/sql/async_queries.sql.go b/backend/controller/sql/async_queries.sql.go new file mode 100644 index 0000000000..181199435a --- /dev/null +++ b/backend/controller/sql/async_queries.sql.go @@ -0,0 +1,89 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 +// source: async_queries.sql + +package sql + +import ( + "context" + "encoding/json" + "time" + + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/encryption" + "github.com/alecthomas/types/optional" +) + +const asyncCallQueueDepth = `-- name: AsyncCallQueueDepth :one +SELECT count(*) +FROM async_calls +WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc') +` + +func (q *Queries) AsyncCallQueueDepth(ctx context.Context) (int64, error) { + row := q.db.QueryRowContext(ctx, asyncCallQueueDepth) + var count int64 + err := row.Scan(&count) + return count, err +} + +const createAsyncCall = `-- name: CreateAsyncCall :one +INSERT INTO async_calls ( + scheduled_at, + verb, + origin, + request, + remaining_attempts, + backoff, + max_backoff, + catch_verb, + parent_request_key, + trace_context +) +VALUES ( + $1::TIMESTAMPTZ, + $2, + $3, + $4, + $5, + $6::interval, + $7::interval, + $8, + $9, + $10::jsonb +) +RETURNING id +` + +type CreateAsyncCallParams struct { + ScheduledAt time.Time + Verb schema.RefKey + Origin string + Request encryption.EncryptedAsyncColumn + RemainingAttempts int32 + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration + CatchVerb optional.Option[schema.RefKey] + ParentRequestKey optional.Option[string] + TraceContext json.RawMessage +} + +func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) { + row := q.db.QueryRowContext(ctx, createAsyncCall, + arg.ScheduledAt, + arg.Verb, + arg.Origin, + arg.Request, + arg.RemainingAttempts, + arg.Backoff, + arg.MaxBackoff, + arg.CatchVerb, + arg.ParentRequestKey, + arg.TraceContext, + ) + var id int64 + err := row.Scan(&id) + return id, err +} diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index c8b93b436c..039df94a07 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -106,48 +106,6 @@ func (ns NullControllerState) Value() (driver.Value, error) { return string(ns.ControllerState), nil } -type CronJobState string - -const ( - CronJobStateIdle CronJobState = "idle" - CronJobStateExecuting CronJobState = "executing" -) - -func (e *CronJobState) Scan(src interface{}) error { - switch s := src.(type) { - case []byte: - *e = CronJobState(s) - case string: - *e = CronJobState(s) - default: - return fmt.Errorf("unsupported scan type for CronJobState: %T", src) - } - return nil -} - -type NullCronJobState struct { - CronJobState CronJobState - Valid bool // Valid is true if CronJobState is not NULL -} - -// Scan implements the Scanner interface. -func (ns *NullCronJobState) Scan(value interface{}) error { - if value == nil { - ns.CronJobState, ns.Valid = "", false - return nil - } - ns.Valid = true - return ns.CronJobState.Scan(value) -} - -// Value implements the driver Valuer interface. -func (ns NullCronJobState) Value() (driver.Value, error) { - if !ns.Valid { - return nil, nil - } - return string(ns.CronJobState), nil -} - type EventType string const ( @@ -401,15 +359,16 @@ type Controller struct { } type CronJob struct { - ID int64 - Key model.CronJobKey - DeploymentID int64 - Verb string - Schedule string - StartTime time.Time - NextExecution time.Time - State model.CronJobState - ModuleName string + ID int64 + Key model.CronJobKey + DeploymentID int64 + Verb string + Schedule string + StartTime time.Time + NextExecution time.Time + ModuleName string + LastExecution optional.Option[time.Time] + LastAsyncCallID optional.Option[int64] } type Deployment struct { diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index 9affa69406..33293fc322 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -39,7 +39,6 @@ type Querier interface { DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error) - EndCronJob(ctx context.Context, nextExecution time.Time, key model.CronJobKey, startTime time.Time) (EndCronJobRow, error) ExpireLeases(ctx context.Context) (int64, error) ExpireRunnerReservations(ctx context.Context) (int64, error) FailAsyncCall(ctx context.Context, error string, iD int64) (bool, error) @@ -55,7 +54,7 @@ type Querier interface { GetArtefactContentRange(ctx context.Context, start int32, count int32, iD int64) ([]byte, error) // Return the digests that exist in the database. GetArtefactDigests(ctx context.Context, digests [][]byte) ([]GetArtefactDigestsRow, error) - GetCronJobs(ctx context.Context) ([]GetCronJobsRow, error) + GetCronJobByKey(ctx context.Context, key model.CronJobKey) (GetCronJobByKeyRow, error) GetDeployment(ctx context.Context, key model.DeploymentKey) (GetDeploymentRow, error) // Get all artefacts matching the given digests. GetDeploymentArtefacts(ctx context.Context, deploymentID int64) ([]GetDeploymentArtefactsRow, error) @@ -83,7 +82,6 @@ type Querier interface { GetRunnerState(ctx context.Context, key model.RunnerKey) (RunnerState, error) GetRunnersForDeployment(ctx context.Context, key model.DeploymentKey) ([]GetRunnersForDeploymentRow, error) GetSchemaForDeployment(ctx context.Context, key model.DeploymentKey) (*schema.Module, error) - GetStaleCronJobs(ctx context.Context, dollar_1 sqltypes.Duration) ([]GetStaleCronJobsRow, error) GetSubscription(ctx context.Context, column1 string, column2 string) (TopicSubscription, error) // Results may not be ready to be scheduled yet due to event consumption delay // Sorting ensures that brand new events (that may not be ready for consumption) @@ -91,12 +89,14 @@ type Querier interface { GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error) GetTopic(ctx context.Context, dollar_1 int64) (Topic, error) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error) + GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]GetUnscheduledCronJobsRow, error) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error InsertTimelineCallEvent(ctx context.Context, arg InsertTimelineCallEventParams) error InsertTimelineDeploymentCreatedEvent(ctx context.Context, arg InsertTimelineDeploymentCreatedEventParams) error InsertTimelineDeploymentUpdatedEvent(ctx context.Context, arg InsertTimelineDeploymentUpdatedEventParams) error InsertTimelineEvent(ctx context.Context, arg InsertTimelineEventParams) error InsertTimelineLogEvent(ctx context.Context, arg InsertTimelineLogEventParams) error + IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error) // Mark any controller entries that haven't been updated recently as dead. KillStaleControllers(ctx context.Context, timeout sqltypes.Duration) (int64, error) KillStaleRunners(ctx context.Context, timeout sqltypes.Duration) (int64, error) @@ -111,13 +111,13 @@ type Querier interface { SetDeploymentDesiredReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int32) error SetNextFSMEvent(ctx context.Context, arg SetNextFSMEventParams) (int64, error) SetSubscriptionCursor(ctx context.Context, column1 model.SubscriptionKey, column2 model.TopicEventKey) error - StartCronJobs(ctx context.Context, keys []string) ([]StartCronJobsRow, error) // Start a new FSM transition, populating the destination state and async call ID. // // "key" is the unique identifier for the FSM execution. StartFSMTransition(ctx context.Context, arg StartFSMTransitionParams) (FsmInstance, error) SucceedAsyncCall(ctx context.Context, response encryption.OptionalEncryptedAsyncColumn, iD int64) (bool, error) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (bool, error) + UpdateCronJobExecution(ctx context.Context, arg UpdateCronJobExecutionParams) error UpdateEncryptionVerification(ctx context.Context, verifyTimeline encryption.OptionalEncryptedTimelineColumn, verifyAsync encryption.OptionalEncryptedAsyncColumn) error UpsertController(ctx context.Context, key model.ControllerKey, endpoint string) (int64, error) UpsertModule(ctx context.Context, language string, name string) (int64, error) diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index 918f8a3cb2..b930a8a250 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -470,36 +470,6 @@ FROM expired; -- name: GetLeaseInfo :one SELECT expires_at, metadata FROM leases WHERE key = @key::lease_key; --- name: CreateAsyncCall :one -INSERT INTO async_calls ( - verb, - origin, - request, - remaining_attempts, - backoff, - max_backoff, - catch_verb, - parent_request_key, - trace_context -) -VALUES ( - @verb, - @origin, - @request, - @remaining_attempts, - @backoff::interval, - @max_backoff::interval, - @catch_verb, - @parent_request_key, - @trace_context::jsonb -) -RETURNING id; - --- name: AsyncCallQueueDepth :one -SELECT count(*) -FROM async_calls -WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc'); - -- name: AcquireAsyncCall :one -- Reserve a pending async call for execution, returning the associated lease -- reservation key and accompanying metadata. diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index b0810f039f..dab7ab0350 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -127,19 +127,6 @@ func (q *Queries) AssociateArtefactWithDeployment(ctx context.Context, arg Assoc return err } -const asyncCallQueueDepth = `-- name: AsyncCallQueueDepth :one -SELECT count(*) -FROM async_calls -WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc') -` - -func (q *Queries) AsyncCallQueueDepth(ctx context.Context) (int64, error) { - row := q.db.QueryRowContext(ctx, asyncCallQueueDepth) - var count int64 - err := row.Scan(&count) - return count, err -} - const beginConsumingTopicEvent = `-- name: BeginConsumingTopicEvent :exec WITH event AS ( SELECT id, created_at, key, topic_id, payload, caller, request_key, trace_context @@ -190,61 +177,6 @@ func (q *Queries) CreateArtefact(ctx context.Context, digest []byte, content []b return id, err } -const createAsyncCall = `-- name: CreateAsyncCall :one -INSERT INTO async_calls ( - verb, - origin, - request, - remaining_attempts, - backoff, - max_backoff, - catch_verb, - parent_request_key, - trace_context -) -VALUES ( - $1, - $2, - $3, - $4, - $5::interval, - $6::interval, - $7, - $8, - $9::jsonb -) -RETURNING id -` - -type CreateAsyncCallParams struct { - Verb schema.RefKey - Origin string - Request encryption.EncryptedAsyncColumn - RemainingAttempts int32 - Backoff sqltypes.Duration - MaxBackoff sqltypes.Duration - CatchVerb optional.Option[schema.RefKey] - ParentRequestKey optional.Option[string] - TraceContext json.RawMessage -} - -func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) { - row := q.db.QueryRowContext(ctx, createAsyncCall, - arg.Verb, - arg.Origin, - arg.Request, - arg.RemainingAttempts, - arg.Backoff, - arg.MaxBackoff, - arg.CatchVerb, - arg.ParentRequestKey, - arg.TraceContext, - ) - var id int64 - err := row.Scan(&id) - return id, err -} - const createCronJob = `-- name: CreateCronJob :exec INSERT INTO cron_jobs (key, deployment_id, module_name, verb, schedule, start_time, next_execution) VALUES ( @@ -436,49 +368,6 @@ func (q *Queries) DeregisterRunner(ctx context.Context, key model.RunnerKey) (in return count, err } -const endCronJob = `-- name: EndCronJob :one -WITH j AS ( -UPDATE cron_jobs - SET state = 'idle', - next_execution = $1::TIMESTAMPTZ - WHERE key = $2::cron_job_key - AND state = 'executing' - AND start_time = $3::TIMESTAMPTZ - RETURNING id, key, deployment_id, verb, schedule, start_time, next_execution, state, module_name -) -SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state - FROM j - INNER JOIN deployments d on j.deployment_id = d.id - LIMIT 1 -` - -type EndCronJobRow struct { - Key model.CronJobKey - DeploymentKey model.DeploymentKey - Module string - Verb string - Schedule string - StartTime time.Time - NextExecution time.Time - State model.CronJobState -} - -func (q *Queries) EndCronJob(ctx context.Context, nextExecution time.Time, key model.CronJobKey, startTime time.Time) (EndCronJobRow, error) { - row := q.db.QueryRowContext(ctx, endCronJob, nextExecution, key, startTime) - var i EndCronJobRow - err := row.Scan( - &i.Key, - &i.DeploymentKey, - &i.Module, - &i.Verb, - &i.Schedule, - &i.StartTime, - &i.NextExecution, - &i.State, - ) - return i, err -} - const expireLeases = `-- name: ExpireLeases :one WITH expired AS ( DELETE FROM leases @@ -902,54 +791,42 @@ func (q *Queries) GetArtefactDigests(ctx context.Context, digests [][]byte) ([]G return items, nil } -const getCronJobs = `-- name: GetCronJobs :many -SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state +const getCronJobByKey = `-- name: GetCronJobByKey :one +SELECT j.id, j.key, j.deployment_id, j.verb, j.schedule, j.start_time, j.next_execution, j.module_name, j.last_execution, j.last_async_call_id, d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id -WHERE d.min_replicas > 0 +WHERE j.key = $1::cron_job_key +FOR UPDATE SKIP LOCKED ` -type GetCronJobsRow struct { - Key model.CronJobKey - DeploymentKey model.DeploymentKey - Module string - Verb string - Schedule string - StartTime time.Time - NextExecution time.Time - State model.CronJobState +type GetCronJobByKeyRow struct { + CronJob CronJob + Deployment Deployment } -func (q *Queries) GetCronJobs(ctx context.Context) ([]GetCronJobsRow, error) { - rows, err := q.db.QueryContext(ctx, getCronJobs) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetCronJobsRow - for rows.Next() { - var i GetCronJobsRow - if err := rows.Scan( - &i.Key, - &i.DeploymentKey, - &i.Module, - &i.Verb, - &i.Schedule, - &i.StartTime, - &i.NextExecution, - &i.State, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil +func (q *Queries) GetCronJobByKey(ctx context.Context, key model.CronJobKey) (GetCronJobByKeyRow, error) { + row := q.db.QueryRowContext(ctx, getCronJobByKey, key) + var i GetCronJobByKeyRow + err := row.Scan( + &i.CronJob.ID, + &i.CronJob.Key, + &i.CronJob.DeploymentID, + &i.CronJob.Verb, + &i.CronJob.Schedule, + &i.CronJob.StartTime, + &i.CronJob.NextExecution, + &i.CronJob.ModuleName, + &i.CronJob.LastExecution, + &i.CronJob.LastAsyncCallID, + &i.Deployment.ID, + &i.Deployment.CreatedAt, + &i.Deployment.ModuleID, + &i.Deployment.Key, + &i.Deployment.Schema, + &i.Deployment.Labels, + &i.Deployment.MinReplicas, + ) + return i, err } const getDeployment = `-- name: GetDeployment :one @@ -1777,57 +1654,6 @@ func (q *Queries) GetSchemaForDeployment(ctx context.Context, key model.Deployme return schema, err } -const getStaleCronJobs = `-- name: GetStaleCronJobs :many -SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state -FROM cron_jobs j - INNER JOIN deployments d on j.deployment_id = d.id -WHERE state = 'executing' - AND start_time < (NOW() AT TIME ZONE 'utc') - $1::INTERVAL -` - -type GetStaleCronJobsRow struct { - Key model.CronJobKey - DeploymentKey model.DeploymentKey - Module string - Verb string - Schedule string - StartTime time.Time - NextExecution time.Time - State model.CronJobState -} - -func (q *Queries) GetStaleCronJobs(ctx context.Context, dollar_1 sqltypes.Duration) ([]GetStaleCronJobsRow, error) { - rows, err := q.db.QueryContext(ctx, getStaleCronJobs, dollar_1) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetStaleCronJobsRow - for rows.Next() { - var i GetStaleCronJobsRow - if err := rows.Scan( - &i.Key, - &i.DeploymentKey, - &i.Module, - &i.Verb, - &i.Schedule, - &i.StartTime, - &i.NextExecution, - &i.State, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - const getSubscription = `-- name: GetSubscription :one WITH module AS ( SELECT id @@ -1954,6 +1780,70 @@ func (q *Queries) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent return i, err } +const getUnscheduledCronJobs = `-- name: GetUnscheduledCronJobs :many +SELECT j.id, j.key, j.deployment_id, j.verb, j.schedule, j.start_time, j.next_execution, j.module_name, j.last_execution, j.last_async_call_id, d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas +FROM cron_jobs j + INNER JOIN deployments d on j.deployment_id = d.id +WHERE d.min_replicas > 0 + AND j.start_time < $1::TIMESTAMPTZ + AND ( + j.last_async_call_id IS NULL + OR NOT EXISTS ( + SELECT 1 + FROM async_calls ac + WHERE ac.id = j.last_async_call_id + AND ac.state IN ('pending', 'executing') + ) + ) +FOR UPDATE SKIP LOCKED +` + +type GetUnscheduledCronJobsRow struct { + CronJob CronJob + Deployment Deployment +} + +func (q *Queries) GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]GetUnscheduledCronJobsRow, error) { + rows, err := q.db.QueryContext(ctx, getUnscheduledCronJobs, startTime) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetUnscheduledCronJobsRow + for rows.Next() { + var i GetUnscheduledCronJobsRow + if err := rows.Scan( + &i.CronJob.ID, + &i.CronJob.Key, + &i.CronJob.DeploymentID, + &i.CronJob.Verb, + &i.CronJob.Schedule, + &i.CronJob.StartTime, + &i.CronJob.NextExecution, + &i.CronJob.ModuleName, + &i.CronJob.LastExecution, + &i.CronJob.LastAsyncCallID, + &i.Deployment.ID, + &i.Deployment.CreatedAt, + &i.Deployment.ModuleID, + &i.Deployment.Key, + &i.Deployment.Schema, + &i.Deployment.Labels, + &i.Deployment.MinReplicas, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const insertSubscriber = `-- name: InsertSubscriber :exec INSERT INTO topic_subscribers ( key, @@ -2224,6 +2114,24 @@ func (q *Queries) InsertTimelineLogEvent(ctx context.Context, arg InsertTimeline return err } +const isCronJobPending = `-- name: IsCronJobPending :one +SELECT EXISTS ( + SELECT 1 + FROM cron_jobs j + INNER JOIN async_calls ac on j.last_async_call_id = ac.id + WHERE j.key = $1::cron_job_key + AND ac.scheduled_at > $2::TIMESTAMPTZ + AND ac.state = 'pending' +) AS pending +` + +func (q *Queries) IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error) { + row := q.db.QueryRowContext(ctx, isCronJobPending, key, startTime) + var pending bool + err := row.Scan(&pending) + return pending, err +} + const killStaleControllers = `-- name: KillStaleControllers :one WITH matches AS ( UPDATE controller @@ -2510,75 +2418,6 @@ func (q *Queries) SetSubscriptionCursor(ctx context.Context, column1 model.Subsc return err } -const startCronJobs = `-- name: StartCronJobs :many -WITH updates AS ( - UPDATE cron_jobs - SET state = 'executing', - start_time = (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ - WHERE key = ANY ($1) - AND state = 'idle' - AND start_time < next_execution - AND (next_execution AT TIME ZONE 'utc') < (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ - RETURNING id, key, state, start_time, next_execution) -SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, - COALESCE(u.start_time, j.start_time) as start_time, - COALESCE(u.next_execution, j.next_execution) as next_execution, - COALESCE(u.state, j.state) as state, - d.min_replicas > 0 as has_min_replicas, - CASE WHEN u.key IS NULL THEN FALSE ELSE TRUE END as updated -FROM cron_jobs j - INNER JOIN deployments d on j.deployment_id = d.id - LEFT JOIN updates u on j.id = u.id -WHERE j.key = ANY ($1) -` - -type StartCronJobsRow struct { - Key model.CronJobKey - DeploymentKey model.DeploymentKey - Module string - Verb string - Schedule string - StartTime time.Time - NextExecution time.Time - State model.CronJobState - HasMinReplicas bool - Updated bool -} - -func (q *Queries) StartCronJobs(ctx context.Context, keys []string) ([]StartCronJobsRow, error) { - rows, err := q.db.QueryContext(ctx, startCronJobs, pq.Array(keys)) - if err != nil { - return nil, err - } - defer rows.Close() - var items []StartCronJobsRow - for rows.Next() { - var i StartCronJobsRow - if err := rows.Scan( - &i.Key, - &i.DeploymentKey, - &i.Module, - &i.Verb, - &i.Schedule, - &i.StartTime, - &i.NextExecution, - &i.State, - &i.HasMinReplicas, - &i.Updated, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - const startFSMTransition = `-- name: StartFSMTransition :one INSERT INTO fsm_instances ( fsm, @@ -2671,6 +2510,31 @@ func (q *Queries) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, key return column_1, err } +const updateCronJobExecution = `-- name: UpdateCronJobExecution :exec +UPDATE cron_jobs + SET last_async_call_id = $1::BIGINT, + last_execution = $2::TIMESTAMPTZ, + next_execution = $3::TIMESTAMPTZ + WHERE key = $4::cron_job_key +` + +type UpdateCronJobExecutionParams struct { + LastAsyncCallID int64 + LastExecution time.Time + NextExecution time.Time + Key model.CronJobKey +} + +func (q *Queries) UpdateCronJobExecution(ctx context.Context, arg UpdateCronJobExecutionParams) error { + _, err := q.db.ExecContext(ctx, updateCronJobExecution, + arg.LastAsyncCallID, + arg.LastExecution, + arg.NextExecution, + arg.Key, + ) + return err +} + const updateEncryptionVerification = `-- name: UpdateEncryptionVerification :exec UPDATE encryption_keys SET verify_timeline = $1, diff --git a/backend/controller/sql/schema/20240815164808_async_calls_cron_job_key.sql b/backend/controller/sql/schema/20240815164808_async_calls_cron_job_key.sql new file mode 100644 index 0000000000..b67ba4ead9 --- /dev/null +++ b/backend/controller/sql/schema/20240815164808_async_calls_cron_job_key.sql @@ -0,0 +1,16 @@ +-- migrate:up + +ALTER TABLE cron_jobs + DROP COLUMN state, + ADD COLUMN last_execution TIMESTAMPTZ, + ADD COLUMN last_async_call_id BIGINT; + +ALTER TABLE cron_jobs + ADD CONSTRAINT fk_cron_jobs_last_async_call_id + FOREIGN KEY (last_async_call_id) REFERENCES async_calls(id) + ON DELETE SET NULL; + +DROP TYPE cron_job_state; + +-- migrate:down + diff --git a/go.mod b/go.mod index 1e70a43dbd..0ab42f3a30 100644 --- a/go.mod +++ b/go.mod @@ -138,7 +138,6 @@ require ( github.com/pelletier/go-toml v1.9.5 // indirect github.com/puzpuzpuz/xsync/v3 v3.4.0 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect - github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b github.com/swaggest/refl v1.3.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect golang.org/x/crypto v0.26.0 // indirect diff --git a/go.sum b/go.sum index a62e91e5db..a4d3baf458 100644 --- a/go.sum +++ b/go.sum @@ -231,8 +231,6 @@ github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE= github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= -github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b h1:h+3JX2VoWTFuyQEo87pStk/a99dzIO1mM9KxIyLPGTU= -github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b/go.mod h1:/yeG0My1xr/u+HZrFQ1tOQQQQrOawfyMUH13ai5brBc= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/sourcegraph/jsonrpc2 v0.2.0 h1:KjN/dC4fP6aN9030MZCJs9WQbTOjWHhrtKVpzzSrr/U= diff --git a/internal/configuration/sql/models.go b/internal/configuration/sql/models.go index c8b93b436c..039df94a07 100644 --- a/internal/configuration/sql/models.go +++ b/internal/configuration/sql/models.go @@ -106,48 +106,6 @@ func (ns NullControllerState) Value() (driver.Value, error) { return string(ns.ControllerState), nil } -type CronJobState string - -const ( - CronJobStateIdle CronJobState = "idle" - CronJobStateExecuting CronJobState = "executing" -) - -func (e *CronJobState) Scan(src interface{}) error { - switch s := src.(type) { - case []byte: - *e = CronJobState(s) - case string: - *e = CronJobState(s) - default: - return fmt.Errorf("unsupported scan type for CronJobState: %T", src) - } - return nil -} - -type NullCronJobState struct { - CronJobState CronJobState - Valid bool // Valid is true if CronJobState is not NULL -} - -// Scan implements the Scanner interface. -func (ns *NullCronJobState) Scan(value interface{}) error { - if value == nil { - ns.CronJobState, ns.Valid = "", false - return nil - } - ns.Valid = true - return ns.CronJobState.Scan(value) -} - -// Value implements the driver Valuer interface. -func (ns NullCronJobState) Value() (driver.Value, error) { - if !ns.Valid { - return nil, nil - } - return string(ns.CronJobState), nil -} - type EventType string const ( @@ -401,15 +359,16 @@ type Controller struct { } type CronJob struct { - ID int64 - Key model.CronJobKey - DeploymentID int64 - Verb string - Schedule string - StartTime time.Time - NextExecution time.Time - State model.CronJobState - ModuleName string + ID int64 + Key model.CronJobKey + DeploymentID int64 + Verb string + Schedule string + StartTime time.Time + NextExecution time.Time + ModuleName string + LastExecution optional.Option[time.Time] + LastAsyncCallID optional.Option[int64] } type Deployment struct { diff --git a/internal/model/cron_job.go b/internal/model/cron_job.go index 8b190f437e..1f0fc084c4 100644 --- a/internal/model/cron_job.go +++ b/internal/model/cron_job.go @@ -3,14 +3,9 @@ package model import ( "time" - "github.com/TBD54566975/ftl/backend/schema" -) + "github.com/alecthomas/types/optional" -type CronJobState string - -const ( - CronJobStateIdle = "idle" - CronJobStateExecuting = "executing" + "github.com/TBD54566975/ftl/backend/schema" ) type CronJob struct { @@ -20,5 +15,5 @@ type CronJob struct { Schedule string StartTime time.Time NextExecution time.Time - State CronJobState + LastExecution optional.Option[time.Time] } diff --git a/sqlc.yaml b/sqlc.yaml index 185560d9c1..cb69797a10 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -4,6 +4,7 @@ sql: engine: "postgresql" queries: - backend/controller/sql/queries.sql + - backend/controller/sql/async_queries.sql # FIXME: Until we fully decouple cron from the controller, we need to include the cron queries here - backend/controller/cronjobs/sql/queries.sql schema: "backend/controller/sql/schema" @@ -86,8 +87,6 @@ sql: nullable: true go_type: type: "optional.Option[model.DeploymentKey]" - - db_type: "cron_job_state" - go_type: "github.com/TBD54566975/ftl/internal/model.CronJobState" - db_type: "controller_key" go_type: "github.com/TBD54566975/ftl/internal/model.ControllerKey" - db_type: "request_key" @@ -151,7 +150,9 @@ sql: # - postgresql-query-too-costly - postgresql-no-seq-scan - <<: *daldir - queries: "backend/controller/cronjobs/sql/queries.sql" + queries: + - backend/controller/cronjobs/sql/queries.sql + - backend/controller/sql/async_queries.sql gen: go: <<: *gengo