Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rewrite cron to use async system #2407

Merged
merged 15 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,5 @@ issues:
- "fmt.Errorf can be replaced with errors.New"
- "fmt.Sprintf can be replaced with string concatenation"
- "strings.Title has been deprecated"
- "error returned from external package is unwrapped.*TranslatePGError"
- "error returned from external package is unwrapped.*TranslatePGError"
- "struct literal uses unkeyed fields"
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ init-db:

# Regenerate SQLC code (requires init-db to be run first)
build-sqlc:
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} backend/controller/{cronjobs}/sql/{db.go,models.go,querier.go,queries.sql.go} internal/configuration/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/{cronjobs}/sql/queries.sql internal/configuration/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate"
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} backend/controller/cronjobs/sql/{db.go,models.go,querier.go,queries.sql.go} internal/configuration/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/sql/async_queries.sql backend/controller/cronjobs/sql/queries.sql internal/configuration/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate"

# Build the ZIP files that are embedded in the FTL release binaries
build-zips: build-kt-runtime
Expand Down
25 changes: 21 additions & 4 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,8 @@ func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling
svc.routes.Store(map[string][]dal.Route{})
svc.schema.Store(&schema.Schema{})

cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, conn, svc.tasks, svc.callWithRequest)
cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, conn)
svc.cronJobs = cronSvc
svc.controllerListListeners = append(svc.controllerListListeners, cronSvc)

pubSub := pubsub.New(ctx, db, svc.tasks, svc)
svc.pubSub = pubSub
Expand Down Expand Up @@ -541,7 +540,10 @@ func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.Re
}
}

s.cronJobs.CreatedOrReplacedDeloyment(ctx, newDeploymentKey)
err = s.cronJobs.CreatedOrReplacedDeloyment(ctx)
if err != nil {
return nil, fmt.Errorf("could not schedule cron jobs: %w", err)
}

return connect.NewResponse(&ftlv1.ReplaceDeployResponse{}), nil
}
Expand Down Expand Up @@ -1403,7 +1405,11 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
logger.Tracef("No async calls to execute")
return time.Second * 2, nil
} else if err != nil {
observability.AsyncCalls.Acquired(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, call.Catching, err)
if call == nil {
observability.AsyncCalls.AcquireFailed(ctx, err)
} else {
observability.AsyncCalls.Acquired(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, call.Catching, err)
}
return 0, err
}

Expand All @@ -1430,6 +1436,9 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
if returnErr == nil {
// Post-commit notification based on origin
switch origin := call.Origin.(type) {
case dal.AsyncOriginCron:
break

case dal.AsyncOriginFSM:
break

Expand Down Expand Up @@ -1568,6 +1577,9 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *

func metadataForAsyncCall(call *dal.AsyncCall) *ftlv1.Metadata {
switch origin := call.Origin.(type) {
case dal.AsyncOriginCron:
return &ftlv1.Metadata{}

case dal.AsyncOriginFSM:
return &ftlv1.Metadata{
Values: []*ftlv1.Metadata_Pair{
Expand Down Expand Up @@ -1595,6 +1607,11 @@ func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.Tx, call *dal.A

// Allow for handling of completion based on origin
switch origin := call.Origin.(type) {
case dal.AsyncOriginCron:
if err := s.cronJobs.OnJobCompletion(ctx, origin.CronJobKey, failed); err != nil {
return fmt.Errorf("failed to finalize cron async call: %w", err)
}

case dal.AsyncOriginFSM:
if err := s.onAsyncFSMCallCompletion(ctx, tx, origin, failed, isFinalResult); err != nil {
return fmt.Errorf("failed to finalize FSM async call: %w", err)
Expand Down
Loading
Loading