Skip to content

Commit

Permalink
fix: remove cron jobs on deactivation
Browse files Browse the repository at this point in the history
fixes #3109
  • Loading branch information
stuartwdouglas committed Oct 15, 2024
1 parent 95651c7 commit 48ed49c
Show file tree
Hide file tree
Showing 14 changed files with 162 additions and 50 deletions.
7 changes: 2 additions & 5 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,11 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca

svc.registry = artefacts.New(conn)

svc.dal = dal.New(ctx, conn, encryption, pubSub)

timelineSvc := timeline.New(ctx, conn, encryption)
svc.timeline = timelineSvc

cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, timelineSvc, conn)
svc.cronJobs = cronSvc
svc.dal = dal.New(ctx, conn, encryption, pubSub, cronSvc)

svc.deploymentLogsSink = newDeploymentLogsSink(ctx, timelineSvc)

Expand Down Expand Up @@ -1164,13 +1162,12 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
}

ingressRoutes := extractIngressRoutingEntries(req.Msg)
cronJobs, err := s.cronJobs.NewCronJobsForModule(ctx, req.Msg.Schema)
if err != nil {
logger.Errorf(err, "Could not generate cron jobs for new deployment")
return nil, fmt.Errorf("could not generate cron jobs for new deployment: %w", err)
}

dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes, cronJobs)
dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes)
if err != nil {
logger.Errorf(err, "Could not create deployment")
return nil, fmt.Errorf("could not create deployment: %w", err)
Expand Down
31 changes: 20 additions & 11 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
encryptionsvc "github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/encryption/api"
"github.com/TBD54566975/ftl/backend/controller/timeline"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
Expand Down Expand Up @@ -47,35 +46,35 @@ func NewForTesting(ctx context.Context, key model.ControllerKey, requestSource s
return svc
}

func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Module) ([]model.CronJob, error) {
func (s *Service) NewCronJobsForModule(ctx context.Context, module *schema.Module) ([]model.CronJob, error) {
logger := log.FromContext(ctx).Scope("cron").Module(module.Name)
start := s.clock.Now().UTC()
newJobs := []model.CronJob{}
merr := []error{}
for _, decl := range module.Decls {
verb, ok := decl.Value.(*schemapb.Decl_Verb)
verb, ok := decl.(*schema.Verb)
if !ok {
continue
}
for _, metadata := range verb.Verb.Metadata {
cronMetadata, ok := metadata.Value.(*schemapb.Metadata_CronJob)
for _, metadata := range verb.Metadata {
cronMetadata, ok := metadata.(*schema.MetadataCronJob)
if !ok {
continue
}
cronStr := cronMetadata.CronJob.Cron
cronStr := cronMetadata.Cron
schedule, err := cron.Parse(cronStr)
if err != nil {
merr = append(merr, fmt.Errorf("failed to parse cron schedule %q: %w", cronStr, err))
continue
}
next, err := cron.NextAfter(schedule, start, false)
if err != nil {
merr = append(merr, fmt.Errorf("failed to calculate next execution for cron job %v:%v with schedule %q: %w", module.Name, verb.Verb.Name, schedule, err))
merr = append(merr, fmt.Errorf("failed to calculate next execution for cron job %v:%v with schedule %q: %w", module.Name, verb.Name, schedule, err))
continue
}
newJobs = append(newJobs, model.CronJob{
Key: model.NewCronJobKey(module.Name, verb.Verb.Name),
Verb: schema.Ref{Module: module.Name, Name: verb.Verb.Name},
Key: model.NewCronJobKey(module.Name, verb.Name),
Verb: schema.Ref{Module: module.Name, Name: verb.Name},
Schedule: cronStr,
StartTime: start,
NextExecution: next,
Expand All @@ -96,15 +95,15 @@ func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Mod
func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context) error {
logger := log.FromContext(ctx).Scope("cron")
logger.Tracef("New deployment; scheduling cron jobs")
err := s.scheduleCronJobs(ctx)
err := s.ScheduleCronJobs(ctx)
if err != nil {
return fmt.Errorf("failed to schedule cron jobs: %w", err)
}
return nil
}

// scheduleCronJobs schedules all cron jobs that are not already scheduled.
func (s *Service) scheduleCronJobs(ctx context.Context) (err error) {
func (s *Service) ScheduleCronJobs(ctx context.Context) (err error) {
logger := log.FromContext(ctx).Scope("cron")
now := s.clock.Now().UTC()

Expand Down Expand Up @@ -237,3 +236,13 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *dal.DAL, job model.Cr
})
return nil
}

func (s *Service) DeleteCronJobsForDeployment(ctx context.Context, key model.DeploymentKey) error {
logger := log.FromContext(ctx).Scope("cron")
logger.Tracef("Deleting cron jobs for deployment %q", key)
err := s.dal.DeleteCronJobsForDeployment(ctx, key)
if err != nil {
return fmt.Errorf("failed to remove cron jobs for deployment %q: %w", key, err)
}
return nil
}
40 changes: 40 additions & 0 deletions backend/controller/cronjobs/cronjobs_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package cronjobs
import (
"os"
"path/filepath"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -35,3 +36,42 @@ func TestCron(t *testing.T) {
},
)
}

func TestCronIsRemoved(t *testing.T) {
dir := t.TempDir()
// We want to make sure that cron jobs are shut down when the deployment is updated
// And we don't end up with double invocations
// To test this we are going to remove the cron and turn it into a normal verb
// If the verb is still invoked after the redeploy then we have a problem
tmpFile := filepath.Join(dir, "cron.txt")
t.Setenv("DEST_FILE", tmpFile)

t.Cleanup(func() { _ = os.Remove(tmpFile) })

in.Run(t,
in.WithLanguages("go"),
in.CopyModule("cron"),
in.Deploy("cron"),
in.Wait("cron"),
in.Sleep(1*time.Second),
func(t testing.TB, ic in.TestContext) {
_, err := os.Stat(tmpFile)
assert.NoError(t, err)
data, err := os.ReadFile(tmpFile)
assert.NoError(t, err)
assert.True(t, strings.Contains(string(data), "Hello, world!"))
},
in.EditFile("cron/cron.go", func(content []byte) []byte {
ret := strings.ReplaceAll(string(content), "//ftl:cron * * * * * * *", "//ftl:verb")
ret = strings.ReplaceAll(ret, "Hello, world!", "NEW VERB")
return []byte(ret)
}),
in.Deploy("cron"),
func(t testing.TB, ic in.TestContext) {
time.Sleep(2 * time.Second)
data, err := os.ReadFile(tmpFile)
assert.NoError(t, err)
assert.False(t, strings.Contains(string(data), "NEW VERB"))
},
)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cronjobs
package internal

import (
"context"
Expand All @@ -13,6 +13,7 @@ import (
"github.com/benbjohnson/clock"

"github.com/TBD54566975/ftl/backend/controller/async"
"github.com/TBD54566975/ftl/backend/controller/cronjobs"
"github.com/TBD54566975/ftl/backend/controller/cronjobs/internal/dal"
parentdal "github.com/TBD54566975/ftl/backend/controller/dal"
dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
Expand Down Expand Up @@ -45,27 +46,45 @@ func TestNewCronJobsForModule(t *testing.T) {
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder().WithKMSURI(optional.Some(uri)))
assert.NoError(t, err)

timelineSrv := timeline.New(ctx, conn, encryption)

cjs := cronjobs.NewForTesting(ctx, key, "test.com", encryption, timelineSrv, *dal, clk)

scheduler := scheduledtask.New(ctx, key, leases.NewFakeLeaser())
pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]())
parentDAL := parentdal.New(ctx, conn, encryption, pubSub)
parentDAL := parentdal.New(ctx, conn, encryption, pubSub, cjs)
moduleName := "initial"
jobsToCreate := newCronJobs(t, moduleName, "* * * * * *", clk, 2) // every minute

deploymentKey, err := parentDAL.CreateDeployment(ctx, "go", &schema.Module{
Name: moduleName,
}, []dalmodel.DeploymentArtefact{}, []parentdal.IngressRoutingEntry{}, jobsToCreate)
decls := []schema.Decl{}
for _, job := range jobsToCreate {
decls = append(decls, &schema.Verb{
Name: job.Verb.Name,
Metadata: []schema.Metadata{&schema.MetadataCronJob{Cron: job.Schedule}},
Request: &schema.Unit{},
Response: &schema.Unit{},
})
}
moduleSchema := &schema.Module{
Name: moduleName,
Decls: decls,
}
deploymentKey, err := parentDAL.CreateDeployment(ctx, "go", moduleSchema, []dalmodel.DeploymentArtefact{}, []parentdal.IngressRoutingEntry{})
assert.NoError(t, err)
err = parentDAL.ReplaceDeployment(ctx, deploymentKey, 1)
assert.NoError(t, err)

timelineSrv := timeline.New(ctx, conn, encryption)

// Progress so that start_time is valid
clk.Add(time.Second)
cjs := NewForTesting(ctx, key, "test.com", encryption, timelineSrv, *dal, clk)
// All jobs need to be scheduled
expectUnscheduledJobs(t, dal, clk, 2)
unscheduledJobs, err := dal.GetUnscheduledCronJobs(ctx, clk.Now())
jobsByVerb := map[string]model.CronJob{}
for _, job := range unscheduledJobs {
jobsByVerb[job.Verb.Name] = job
}
for i := range jobsToCreate {
jobsToCreate[i].Key = jobsByVerb[jobsToCreate[i].Verb.Name].Key
}
assert.NoError(t, err)
assert.Equal(t, len(unscheduledJobs), 2)

Expand All @@ -74,7 +93,7 @@ func TestNewCronJobsForModule(t *testing.T) {
assert.IsError(t, err, libdal.ErrNotFound)
assert.EqualError(t, err, "no pending async calls: not found")

err = cjs.scheduleCronJobs(ctx)
err = cjs.ScheduleCronJobs(ctx)
assert.NoError(t, err)
expectUnscheduledJobs(t, dal, clk, 0)
for _, job := range jobsToCreate {
Expand Down
8 changes: 8 additions & 0 deletions backend/controller/cronjobs/internal/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,11 @@ func (d *DAL) UpdateCronJobExecution(ctx context.Context, params UpdateCronJobEx
}
return nil
}

func (d *DAL) DeleteCronJobsForDeployment(ctx context.Context, key model.DeploymentKey) error {
err := d.db.DeleteCronJobsForDeployment(ctx, key)
if err != nil {
return fmt.Errorf("failed to delete cron jobs for deployment %v: %w", key, libdal.TranslatePGError(err))
}
return nil
}
1 change: 1 addition & 0 deletions backend/controller/cronjobs/internal/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions backend/controller/cronjobs/internal/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,7 @@ SELECT EXISTS (
AND ac.scheduled_at > sqlc.arg('start_time')::TIMESTAMPTZ
AND ac.state = 'pending'
) AS pending;

-- name: DeleteCronJobsForDeployment :exec
DELETE FROM cron_jobs
WHERE deployment_id = (SELECT id FROM deployments WHERE key = sqlc.arg('deployment_key')::deployment_key LIMIT 1);
10 changes: 10 additions & 0 deletions backend/controller/cronjobs/internal/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 29 additions & 19 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"google.golang.org/protobuf/proto"

aregistry "github.com/TBD54566975/ftl/backend/controller/artefacts"
"github.com/TBD54566975/ftl/backend/controller/cronjobs"
dalsql "github.com/TBD54566975/ftl/backend/controller/dal/internal/sql"
dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
"github.com/TBD54566975/ftl/backend/controller/encryption"
Expand Down Expand Up @@ -52,7 +53,7 @@ type Reservation interface {
Rollback(ctx context.Context) error
}

func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service, pubsub *pubsub.Service) *DAL {
func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service, pubsub *pubsub.Service, cron *cronjobs.Service) *DAL {
var d *DAL
db := dalsql.New(conn)
d = &DAL{
Expand All @@ -69,6 +70,7 @@ func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Ser
encryption: d.encryption,
registry: aregistry.New(h.Connection),
DeploymentChanges: d.DeploymentChanges,
cronjobs: cron,
}
}),
DeploymentChanges: inprocesspubsub.New[DeploymentNotification](),
Expand All @@ -85,6 +87,7 @@ type DAL struct {
pubsub *pubsub.Service
encryption *encryption.Service
registry *aregistry.Service
cronjobs *cronjobs.Service

// DeploymentChanges is a Topic that receives changes to the deployments table.
DeploymentChanges *inprocesspubsub.Topic[DeploymentNotification]
Expand Down Expand Up @@ -207,7 +210,7 @@ type IngressRoutingEntry struct {
// previously created artefacts with it.
//
// If an existing deployment with identical artefacts exists, it is returned.
func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []dalmodel.DeploymentArtefact, ingressRoutes []IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error) {
func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []dalmodel.DeploymentArtefact, ingressRoutes []IngressRoutingEntry) (key model.DeploymentKey, err error) {
logger := log.FromContext(ctx)

// Start the parent transaction
Expand Down Expand Up @@ -303,23 +306,6 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
}
}

for _, job := range cronJobs {
// Start time must be calculated by the caller rather than generated by db
// This ensures that nextExecution is after start time, otherwise the job will never be triggered
err := tx.db.CreateCronJob(ctx, dalsql.CreateCronJobParams{
Key: job.Key,
DeploymentKey: deploymentKey,
ModuleName: job.Verb.Module,
Verb: job.Verb.Name,
StartTime: job.StartTime,
Schedule: job.Schedule,
NextExecution: job.NextExecution,
})
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("failed to create cron job: %w", libdal.TranslatePGError(err))
}
}

return deploymentKey, nil
}

Expand Down Expand Up @@ -537,6 +523,26 @@ func (d *DAL) deploymentWillActivate(ctx context.Context, key model.DeploymentKe
if err != nil {
return fmt.Errorf("could not create subscribers: %w", err)
}
cronJobs, err := d.cronjobs.NewCronJobsForModule(ctx, module)
if err != nil {
return fmt.Errorf("could not create cron jobs: %w", err)
}
for _, job := range cronJobs {
// Start time must be calculated by the caller rather than generated by db
// This ensures that nextExecution is after start time, otherwise the job will never be triggered
err := d.db.CreateCronJob(ctx, dalsql.CreateCronJobParams{
Key: job.Key,
DeploymentKey: key,
ModuleName: job.Verb.Module,
Verb: job.Verb.Name,
StartTime: job.StartTime,
Schedule: job.Schedule,
NextExecution: job.NextExecution,
})
if err != nil {
return fmt.Errorf("failed to create cron job: %w", libdal.TranslatePGError(err))
}
}
return nil
}

Expand All @@ -548,6 +554,10 @@ func (d *DAL) deploymentWillDeactivate(ctx context.Context, key model.Deployment
if err != nil {
return fmt.Errorf("could not remove subscriptions and subscribers: %w", err)
}
err = d.cronjobs.DeleteCronJobsForDeployment(ctx, key)
if err != nil {
return fmt.Errorf("could not delete cron jobs: %w", err)
}
return nil
}

Expand Down
Loading

0 comments on commit 48ed49c

Please sign in to comment.