From 5ed558d4005776a62b7fed46d90fa6425824e815 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Tue, 15 Oct 2024 14:36:32 +1100 Subject: [PATCH] fix: remove cron jobs on deactivation (#3122) fixes #3109 --- backend/controller/controller.go | 7 +- backend/controller/cronjobs/cronjobs.go | 31 +++--- .../cronjobs/cronjobs_integration_test.go | 40 ++++++++ .../cronjobs/{ => internal}/cronjobs_test.go | 40 +++++--- .../controller/cronjobs/internal/dal/dal.go | 8 ++ .../cronjobs/internal/sql/querier.go | 1 + .../cronjobs/internal/sql/queries.sql | 4 + .../cronjobs/internal/sql/queries.sql.go | 10 ++ backend/controller/dal/async_calls_test.go | 2 +- backend/controller/dal/dal.go | 48 ++++++---- backend/controller/dal/dal_test.go | 18 +++- backend/controller/dal/fsm_test.go | 4 +- .../controller/dal/internal/sql/querier.go | 1 + .../dal/internal/sql/queries.sql.go | 10 ++ backend/controller/timeline/events_call.go | 2 +- .../timeline/{ => internal}/timeline_test.go | 96 ++++++++++--------- 16 files changed, 224 insertions(+), 98 deletions(-) rename backend/controller/cronjobs/{ => internal}/cronjobs_test.go (85%) rename backend/controller/timeline/{ => internal}/timeline_test.go (76%) diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 849e76d346..28f74a0078 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -279,13 +279,11 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca svc.registry = artefacts.New(conn) - svc.dal = dal.New(ctx, conn, encryption, pubSub) - timelineSvc := timeline.New(ctx, conn, encryption) svc.timeline = timelineSvc - cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, timelineSvc, conn) svc.cronJobs = cronSvc + svc.dal = dal.New(ctx, conn, encryption, pubSub, cronSvc) svc.deploymentLogsSink = newDeploymentLogsSink(ctx, timelineSvc) @@ -1164,13 +1162,12 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl } ingressRoutes := extractIngressRoutingEntries(req.Msg) - cronJobs, err := s.cronJobs.NewCronJobsForModule(ctx, req.Msg.Schema) if err != nil { logger.Errorf(err, "Could not generate cron jobs for new deployment") return nil, fmt.Errorf("could not generate cron jobs for new deployment: %w", err) } - dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes, cronJobs) + dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes) if err != nil { logger.Errorf(err, "Could not create deployment") return nil, fmt.Errorf("could not create deployment: %w", err) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 89cda37ddd..df8d8b20fd 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -15,7 +15,6 @@ import ( encryptionsvc "github.com/TBD54566975/ftl/backend/controller/encryption" "github.com/TBD54566975/ftl/backend/controller/encryption/api" "github.com/TBD54566975/ftl/backend/controller/timeline" - schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" "github.com/TBD54566975/ftl/internal/cron" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" @@ -47,22 +46,22 @@ func NewForTesting(ctx context.Context, key model.ControllerKey, requestSource s return svc } -func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Module) ([]model.CronJob, error) { +func (s *Service) NewCronJobsForModule(ctx context.Context, module *schema.Module) ([]model.CronJob, error) { logger := log.FromContext(ctx).Scope("cron").Module(module.Name) start := s.clock.Now().UTC() newJobs := []model.CronJob{} merr := []error{} for _, decl := range module.Decls { - verb, ok := decl.Value.(*schemapb.Decl_Verb) + verb, ok := decl.(*schema.Verb) if !ok { continue } - for _, metadata := range verb.Verb.Metadata { - cronMetadata, ok := metadata.Value.(*schemapb.Metadata_CronJob) + for _, metadata := range verb.Metadata { + cronMetadata, ok := metadata.(*schema.MetadataCronJob) if !ok { continue } - cronStr := cronMetadata.CronJob.Cron + cronStr := cronMetadata.Cron schedule, err := cron.Parse(cronStr) if err != nil { merr = append(merr, fmt.Errorf("failed to parse cron schedule %q: %w", cronStr, err)) @@ -70,12 +69,12 @@ func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Mod } next, err := cron.NextAfter(schedule, start, false) if err != nil { - merr = append(merr, fmt.Errorf("failed to calculate next execution for cron job %v:%v with schedule %q: %w", module.Name, verb.Verb.Name, schedule, err)) + merr = append(merr, fmt.Errorf("failed to calculate next execution for cron job %v:%v with schedule %q: %w", module.Name, verb.Name, schedule, err)) continue } newJobs = append(newJobs, model.CronJob{ - Key: model.NewCronJobKey(module.Name, verb.Verb.Name), - Verb: schema.Ref{Module: module.Name, Name: verb.Verb.Name}, + Key: model.NewCronJobKey(module.Name, verb.Name), + Verb: schema.Ref{Module: module.Name, Name: verb.Name}, Schedule: cronStr, StartTime: start, NextExecution: next, @@ -96,7 +95,7 @@ func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Mod func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context) error { logger := log.FromContext(ctx).Scope("cron") logger.Tracef("New deployment; scheduling cron jobs") - err := s.scheduleCronJobs(ctx) + err := s.ScheduleCronJobs(ctx) if err != nil { return fmt.Errorf("failed to schedule cron jobs: %w", err) } @@ -104,7 +103,7 @@ func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context) error { } // scheduleCronJobs schedules all cron jobs that are not already scheduled. -func (s *Service) scheduleCronJobs(ctx context.Context) (err error) { +func (s *Service) ScheduleCronJobs(ctx context.Context) (err error) { logger := log.FromContext(ctx).Scope("cron") now := s.clock.Now().UTC() @@ -237,3 +236,13 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *dal.DAL, job model.Cr }) return nil } + +func (s *Service) DeleteCronJobsForDeployment(ctx context.Context, key model.DeploymentKey) error { + logger := log.FromContext(ctx).Scope("cron") + logger.Tracef("Deleting cron jobs for deployment %q", key) + err := s.dal.DeleteCronJobsForDeployment(ctx, key) + if err != nil { + return fmt.Errorf("failed to remove cron jobs for deployment %q: %w", key, err) + } + return nil +} diff --git a/backend/controller/cronjobs/cronjobs_integration_test.go b/backend/controller/cronjobs/cronjobs_integration_test.go index 1b65cdb952..985dfb3bc2 100644 --- a/backend/controller/cronjobs/cronjobs_integration_test.go +++ b/backend/controller/cronjobs/cronjobs_integration_test.go @@ -5,6 +5,7 @@ package cronjobs import ( "os" "path/filepath" + "strings" "testing" "time" @@ -35,3 +36,42 @@ func TestCron(t *testing.T) { }, ) } + +func TestCronIsRemoved(t *testing.T) { + dir := t.TempDir() + // We want to make sure that cron jobs are shut down when the deployment is updated + // And we don't end up with double invocations + // To test this we are going to remove the cron and turn it into a normal verb + // If the verb is still invoked after the redeploy then we have a problem + tmpFile := filepath.Join(dir, "cron.txt") + t.Setenv("DEST_FILE", tmpFile) + + t.Cleanup(func() { _ = os.Remove(tmpFile) }) + + in.Run(t, + in.WithLanguages("go"), + in.CopyModule("cron"), + in.Deploy("cron"), + in.Wait("cron"), + in.Sleep(1*time.Second), + func(t testing.TB, ic in.TestContext) { + _, err := os.Stat(tmpFile) + assert.NoError(t, err) + data, err := os.ReadFile(tmpFile) + assert.NoError(t, err) + assert.True(t, strings.Contains(string(data), "Hello, world!")) + }, + in.EditFile("cron/cron.go", func(content []byte) []byte { + ret := strings.ReplaceAll(string(content), "//ftl:cron * * * * * * *", "//ftl:verb") + ret = strings.ReplaceAll(ret, "Hello, world!", "NEW VERB") + return []byte(ret) + }), + in.Deploy("cron"), + func(t testing.TB, ic in.TestContext) { + time.Sleep(2 * time.Second) + data, err := os.ReadFile(tmpFile) + assert.NoError(t, err) + assert.False(t, strings.Contains(string(data), "NEW VERB")) + }, + ) +} diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/internal/cronjobs_test.go similarity index 85% rename from backend/controller/cronjobs/cronjobs_test.go rename to backend/controller/cronjobs/internal/cronjobs_test.go index 37692900c9..4342764f6a 100644 --- a/backend/controller/cronjobs/cronjobs_test.go +++ b/backend/controller/cronjobs/internal/cronjobs_test.go @@ -1,4 +1,4 @@ -package cronjobs +package internal import ( "context" @@ -13,6 +13,7 @@ import ( "github.com/benbjohnson/clock" "github.com/TBD54566975/ftl/backend/controller/async" + "github.com/TBD54566975/ftl/backend/controller/cronjobs" "github.com/TBD54566975/ftl/backend/controller/cronjobs/internal/dal" parentdal "github.com/TBD54566975/ftl/backend/controller/dal" dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" @@ -37,7 +38,6 @@ func TestNewCronJobsForModule(t *testing.T) { clk := clock.NewMock() clk.Add(time.Second) // half way between cron job executions - key := model.NewControllerKey("localhost", strconv.Itoa(8080+1)) conn := sqltest.OpenForTesting(ctx, t) dal := dal.New(conn) @@ -45,27 +45,45 @@ func TestNewCronJobsForModule(t *testing.T) { encryption, err := encryption.New(ctx, conn, encryption.NewBuilder().WithKMSURI(optional.Some(uri))) assert.NoError(t, err) + key := model.NewControllerKey("localhost", strconv.Itoa(8080+1)) + timelineSrv := timeline.New(ctx, conn, encryption) + cjs := cronjobs.NewForTesting(ctx, key, "test.com", encryption, timelineSrv, *dal, clk) + scheduler := scheduledtask.New(ctx, key, leases.NewFakeLeaser()) pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]()) - parentDAL := parentdal.New(ctx, conn, encryption, pubSub) + parentDAL := parentdal.New(ctx, conn, encryption, pubSub, cjs) moduleName := "initial" jobsToCreate := newCronJobs(t, moduleName, "* * * * * *", clk, 2) // every minute - - deploymentKey, err := parentDAL.CreateDeployment(ctx, "go", &schema.Module{ - Name: moduleName, - }, []dalmodel.DeploymentArtefact{}, []parentdal.IngressRoutingEntry{}, jobsToCreate) + decls := []schema.Decl{} + for _, job := range jobsToCreate { + decls = append(decls, &schema.Verb{ + Name: job.Verb.Name, + Metadata: []schema.Metadata{&schema.MetadataCronJob{Cron: job.Schedule}}, + Request: &schema.Unit{}, + Response: &schema.Unit{}, + }) + } + moduleSchema := &schema.Module{ + Name: moduleName, + Decls: decls, + } + deploymentKey, err := parentDAL.CreateDeployment(ctx, "go", moduleSchema, []dalmodel.DeploymentArtefact{}, []parentdal.IngressRoutingEntry{}) assert.NoError(t, err) err = parentDAL.ReplaceDeployment(ctx, deploymentKey, 1) assert.NoError(t, err) - timelineSrv := timeline.New(ctx, conn, encryption) - // Progress so that start_time is valid clk.Add(time.Second) - cjs := NewForTesting(ctx, key, "test.com", encryption, timelineSrv, *dal, clk) // All jobs need to be scheduled expectUnscheduledJobs(t, dal, clk, 2) unscheduledJobs, err := dal.GetUnscheduledCronJobs(ctx, clk.Now()) + jobsByVerb := map[string]model.CronJob{} + for _, job := range unscheduledJobs { + jobsByVerb[job.Verb.Name] = job + } + for i := range jobsToCreate { + jobsToCreate[i].Key = jobsByVerb[jobsToCreate[i].Verb.Name].Key + } assert.NoError(t, err) assert.Equal(t, len(unscheduledJobs), 2) @@ -74,7 +92,7 @@ func TestNewCronJobsForModule(t *testing.T) { assert.IsError(t, err, libdal.ErrNotFound) assert.EqualError(t, err, "no pending async calls: not found") - err = cjs.scheduleCronJobs(ctx) + err = cjs.ScheduleCronJobs(ctx) assert.NoError(t, err) expectUnscheduledJobs(t, dal, clk, 0) for _, job := range jobsToCreate { diff --git a/backend/controller/cronjobs/internal/dal/dal.go b/backend/controller/cronjobs/internal/dal/dal.go index 670ffe8d0c..a72e4ae739 100644 --- a/backend/controller/cronjobs/internal/dal/dal.go +++ b/backend/controller/cronjobs/internal/dal/dal.go @@ -100,3 +100,11 @@ func (d *DAL) UpdateCronJobExecution(ctx context.Context, params UpdateCronJobEx } return nil } + +func (d *DAL) DeleteCronJobsForDeployment(ctx context.Context, key model.DeploymentKey) error { + err := d.db.DeleteCronJobsForDeployment(ctx, key) + if err != nil { + return fmt.Errorf("failed to delete cron jobs for deployment %v: %w", key, libdal.TranslatePGError(err)) + } + return nil +} diff --git a/backend/controller/cronjobs/internal/sql/querier.go b/backend/controller/cronjobs/internal/sql/querier.go index 05c77d7f4c..7890eadf23 100644 --- a/backend/controller/cronjobs/internal/sql/querier.go +++ b/backend/controller/cronjobs/internal/sql/querier.go @@ -15,6 +15,7 @@ type Querier interface { AsyncCallQueueDepth(ctx context.Context) (int64, error) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) CreateCronJob(ctx context.Context, arg CreateCronJobParams) error + DeleteCronJobsForDeployment(ctx context.Context, deploymentKey model.DeploymentKey) error GetCronJobByKey(ctx context.Context, key model.CronJobKey) (GetCronJobByKeyRow, error) GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]GetUnscheduledCronJobsRow, error) IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error) diff --git a/backend/controller/cronjobs/internal/sql/queries.sql b/backend/controller/cronjobs/internal/sql/queries.sql index a3a3ec627a..cb1136326f 100644 --- a/backend/controller/cronjobs/internal/sql/queries.sql +++ b/backend/controller/cronjobs/internal/sql/queries.sql @@ -49,3 +49,7 @@ SELECT EXISTS ( AND ac.scheduled_at > sqlc.arg('start_time')::TIMESTAMPTZ AND ac.state = 'pending' ) AS pending; + +-- name: DeleteCronJobsForDeployment :exec +DELETE FROM cron_jobs +WHERE deployment_id = (SELECT id FROM deployments WHERE key = sqlc.arg('deployment_key')::deployment_key LIMIT 1); \ No newline at end of file diff --git a/backend/controller/cronjobs/internal/sql/queries.sql.go b/backend/controller/cronjobs/internal/sql/queries.sql.go index 766d3a82a1..6d17718fc3 100644 --- a/backend/controller/cronjobs/internal/sql/queries.sql.go +++ b/backend/controller/cronjobs/internal/sql/queries.sql.go @@ -47,6 +47,16 @@ func (q *Queries) CreateCronJob(ctx context.Context, arg CreateCronJobParams) er return err } +const deleteCronJobsForDeployment = `-- name: DeleteCronJobsForDeployment :exec +DELETE FROM cron_jobs +WHERE deployment_id = (SELECT id FROM deployments WHERE key = $1::deployment_key LIMIT 1) +` + +func (q *Queries) DeleteCronJobsForDeployment(ctx context.Context, deploymentKey model.DeploymentKey) error { + _, err := q.db.ExecContext(ctx, deleteCronJobsForDeployment, deploymentKey) + return err +} + const getCronJobByKey = `-- name: GetCronJobByKey :one SELECT j.id, j.key, j.deployment_id, j.verb, j.schedule, j.start_time, j.next_execution, j.module_name, j.last_execution, j.last_async_call_id, d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas, d.last_activated_at FROM cron_jobs j diff --git a/backend/controller/dal/async_calls_test.go b/backend/controller/dal/async_calls_test.go index 37434089c1..3d9220424c 100644 --- a/backend/controller/dal/async_calls_test.go +++ b/backend/controller/dal/async_calls_test.go @@ -26,7 +26,7 @@ func TestNoCallToAcquire(t *testing.T) { assert.NoError(t, err) scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser()) pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]()) - dal := New(ctx, conn, encryption, pubSub) + dal := New(ctx, conn, encryption, pubSub, nil) _, _, err = dal.AcquireAsyncCall(ctx) assert.IsError(t, err, libdal.ErrNotFound) diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index 5582e9f0d5..7f0ef5f27a 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -14,6 +14,7 @@ import ( "google.golang.org/protobuf/proto" aregistry "github.com/TBD54566975/ftl/backend/controller/artefacts" + "github.com/TBD54566975/ftl/backend/controller/cronjobs" dalsql "github.com/TBD54566975/ftl/backend/controller/dal/internal/sql" dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/encryption" @@ -52,7 +53,7 @@ type Reservation interface { Rollback(ctx context.Context) error } -func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service, pubsub *pubsub.Service) *DAL { +func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service, pubsub *pubsub.Service, cron *cronjobs.Service) *DAL { var d *DAL db := dalsql.New(conn) d = &DAL{ @@ -69,6 +70,7 @@ func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Ser encryption: d.encryption, registry: aregistry.New(h.Connection), DeploymentChanges: d.DeploymentChanges, + cronjobs: cron, } }), DeploymentChanges: inprocesspubsub.New[DeploymentNotification](), @@ -85,6 +87,7 @@ type DAL struct { pubsub *pubsub.Service encryption *encryption.Service registry *aregistry.Service + cronjobs *cronjobs.Service // DeploymentChanges is a Topic that receives changes to the deployments table. DeploymentChanges *inprocesspubsub.Topic[DeploymentNotification] @@ -207,7 +210,7 @@ type IngressRoutingEntry struct { // previously created artefacts with it. // // If an existing deployment with identical artefacts exists, it is returned. -func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []dalmodel.DeploymentArtefact, ingressRoutes []IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error) { +func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []dalmodel.DeploymentArtefact, ingressRoutes []IngressRoutingEntry) (key model.DeploymentKey, err error) { logger := log.FromContext(ctx) // Start the parent transaction @@ -303,23 +306,6 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem } } - for _, job := range cronJobs { - // Start time must be calculated by the caller rather than generated by db - // This ensures that nextExecution is after start time, otherwise the job will never be triggered - err := tx.db.CreateCronJob(ctx, dalsql.CreateCronJobParams{ - Key: job.Key, - DeploymentKey: deploymentKey, - ModuleName: job.Verb.Module, - Verb: job.Verb.Name, - StartTime: job.StartTime, - Schedule: job.Schedule, - NextExecution: job.NextExecution, - }) - if err != nil { - return model.DeploymentKey{}, fmt.Errorf("failed to create cron job: %w", libdal.TranslatePGError(err)) - } - } - return deploymentKey, nil } @@ -537,6 +523,26 @@ func (d *DAL) deploymentWillActivate(ctx context.Context, key model.DeploymentKe if err != nil { return fmt.Errorf("could not create subscribers: %w", err) } + cronJobs, err := d.cronjobs.NewCronJobsForModule(ctx, module) + if err != nil { + return fmt.Errorf("could not create cron jobs: %w", err) + } + for _, job := range cronJobs { + // Start time must be calculated by the caller rather than generated by db + // This ensures that nextExecution is after start time, otherwise the job will never be triggered + err := d.db.CreateCronJob(ctx, dalsql.CreateCronJobParams{ + Key: job.Key, + DeploymentKey: key, + ModuleName: job.Verb.Module, + Verb: job.Verb.Name, + StartTime: job.StartTime, + Schedule: job.Schedule, + NextExecution: job.NextExecution, + }) + if err != nil { + return fmt.Errorf("failed to create cron job: %w", libdal.TranslatePGError(err)) + } + } return nil } @@ -548,6 +554,10 @@ func (d *DAL) deploymentWillDeactivate(ctx context.Context, key model.Deployment if err != nil { return fmt.Errorf("could not remove subscriptions and subscribers: %w", err) } + err = d.cronjobs.DeleteCronJobsForDeployment(ctx, key) + if err != nil { + return fmt.Errorf("could not delete cron jobs: %w", err) + } return nil } diff --git a/backend/controller/dal/dal_test.go b/backend/controller/dal/dal_test.go index 8c855fe0e2..d53b0bfb4f 100644 --- a/backend/controller/dal/dal_test.go +++ b/backend/controller/dal/dal_test.go @@ -3,7 +3,6 @@ package dal import ( "bytes" "context" - "github.com/TBD54566975/ftl/backend/controller/artefacts" "io" "sync" "testing" @@ -13,6 +12,10 @@ import ( "github.com/alecthomas/types/optional" "golang.org/x/sync/errgroup" + "github.com/TBD54566975/ftl/backend/controller/artefacts" + "github.com/TBD54566975/ftl/backend/controller/cronjobs" + "github.com/TBD54566975/ftl/backend/controller/timeline" + dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/encryption" "github.com/TBD54566975/ftl/backend/controller/leases" @@ -34,7 +37,10 @@ func TestDAL(t *testing.T) { scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser()) pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]()) - dal := New(ctx, conn, encryption, pubSub) + timelineSrv := timeline.New(ctx, conn, encryption) + key := model.NewControllerKey("localhost", "8081") + cjs := cronjobs.New(ctx, key, "test.com", encryption, timelineSrv, conn) + dal := New(ctx, conn, encryption, pubSub, cjs) var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100) var testSHA = sha256.Sum(testContent) @@ -68,7 +74,7 @@ func TestDAL(t *testing.T) { Digest: testSha, Executable: true, Path: "dir/filename", - }}, nil, nil) + }}, nil) assert.NoError(t, err) }) @@ -194,7 +200,11 @@ func TestCreateArtefactConflict(t *testing.T) { scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser()) pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]()) - dal := New(ctx, conn, encryption, pubSub) + + timelineSrv := timeline.New(ctx, conn, encryption) + key := model.NewControllerKey("localhost", "8081") + cjs := cronjobs.New(ctx, key, "test.com", encryption, timelineSrv, conn) + dal := New(ctx, conn, encryption, pubSub, cjs) idch := make(chan sha256.SHA256, 2) diff --git a/backend/controller/dal/fsm_test.go b/backend/controller/dal/fsm_test.go index 859f468365..bc8aa502c3 100644 --- a/backend/controller/dal/fsm_test.go +++ b/backend/controller/dal/fsm_test.go @@ -10,6 +10,7 @@ import ( "github.com/alecthomas/types/optional" "github.com/TBD54566975/ftl/backend/controller/async" + "github.com/TBD54566975/ftl/backend/controller/cronjobs" "github.com/TBD54566975/ftl/backend/controller/encryption" "github.com/TBD54566975/ftl/backend/controller/leases" leasedal "github.com/TBD54566975/ftl/backend/controller/leases/dbleaser" @@ -30,7 +31,8 @@ func TestSendFSMEvent(t *testing.T) { scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser()) pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]()) - dal := New(ctx, conn, encryption, pubSub) + cron := cronjobs.New(ctx, model.ControllerKey{}, "test", encryption, nil, conn) + dal := New(ctx, conn, encryption, pubSub, cron) _, _, err = dal.AcquireAsyncCall(ctx) assert.IsError(t, err, libdal.ErrNotFound) diff --git a/backend/controller/dal/internal/sql/querier.go b/backend/controller/dal/internal/sql/querier.go index 652f5a9391..0873932cd1 100644 --- a/backend/controller/dal/internal/sql/querier.go +++ b/backend/controller/dal/internal/sql/querier.go @@ -27,6 +27,7 @@ type Querier interface { CreateDeployment(ctx context.Context, moduleName string, schema []byte, key model.DeploymentKey) error CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error + DeleteCronJobsForDeployment(ctx context.Context, deploymentKey model.DeploymentKey) error DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error) diff --git a/backend/controller/dal/internal/sql/queries.sql.go b/backend/controller/dal/internal/sql/queries.sql.go index 31990ea4b0..a778919623 100644 --- a/backend/controller/dal/internal/sql/queries.sql.go +++ b/backend/controller/dal/internal/sql/queries.sql.go @@ -218,6 +218,16 @@ func (q *Queries) CreateRequest(ctx context.Context, origin Origin, key model.Re return err } +const deleteCronJobsForDeployment = `-- name: DeleteCronJobsForDeployment :exec +DELETE FROM cron_jobs +WHERE deployment_id = (SELECT id FROM deployments WHERE key = $1::deployment_key LIMIT 1) +` + +func (q *Queries) DeleteCronJobsForDeployment(ctx context.Context, deploymentKey model.DeploymentKey) error { + _, err := q.db.ExecContext(ctx, deleteCronJobsForDeployment, deploymentKey) + return err +} + const deleteSubscribers = `-- name: DeleteSubscribers :many DELETE FROM topic_subscribers WHERE deployment_id IN ( diff --git a/backend/controller/timeline/events_call.go b/backend/controller/timeline/events_call.go index 363ce746f4..cc981dea5a 100644 --- a/backend/controller/timeline/events_call.go +++ b/backend/controller/timeline/events_call.go @@ -149,7 +149,7 @@ func callToCallEvent(call *Call) *CallEvent { } } -func callEventToCall(event *CallEvent) *Call { +func CallEventToCallForTesting(event *CallEvent) *Call { var response either.Either[*ftlv1.CallResponse, error] if eventErr, ok := event.Error.Get(); ok { response = either.RightOf[*ftlv1.CallResponse](errors.New(eventErr)) diff --git a/backend/controller/timeline/timeline_test.go b/backend/controller/timeline/internal/timeline_test.go similarity index 76% rename from backend/controller/timeline/timeline_test.go rename to backend/controller/timeline/internal/timeline_test.go index 5821171d8c..a044880c17 100644 --- a/backend/controller/timeline/timeline_test.go +++ b/backend/controller/timeline/internal/timeline_test.go @@ -1,4 +1,4 @@ -package timeline +package internal import ( "bytes" @@ -8,14 +8,17 @@ import ( "net/http" "net/url" "reflect" + "strconv" "testing" "time" - "github.com/TBD54566975/ftl/backend/controller/artefacts" - "github.com/alecthomas/assert/v2" "github.com/alecthomas/types/optional" + "github.com/TBD54566975/ftl/backend/controller/artefacts" + "github.com/TBD54566975/ftl/backend/controller/cronjobs" + timeline2 "github.com/TBD54566975/ftl/backend/controller/timeline" + controllerdal "github.com/TBD54566975/ftl/backend/controller/dal" dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/encryption" @@ -35,11 +38,14 @@ func TestTimeline(t *testing.T) { encryption, err := encryption.New(ctx, conn, encryption.NewBuilder()) assert.NoError(t, err) - timeline := New(ctx, conn, encryption) + timeline := timeline2.New(ctx, conn, encryption) registry := artefacts.New(conn) scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser()) pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]()) - controllerDAL := controllerdal.New(ctx, conn, encryption, pubSub) + + key := model.NewControllerKey("localhost", strconv.Itoa(8080+1)) + cjs := cronjobs.New(ctx, key, "test.com", encryption, timeline, conn) + controllerDAL := controllerdal.New(ctx, conn, encryption, pubSub, cjs) var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100) @@ -62,7 +68,7 @@ func TestTimeline(t *testing.T) { Digest: testSha, Executable: true, Path: "dir/filename", - }}, nil, nil) + }}, nil) assert.NoError(t, err) }) @@ -77,7 +83,7 @@ func TestTimeline(t *testing.T) { assert.NoError(t, err) }) - callEvent := &CallEvent{ + callEvent := &timeline2.CallEvent{ Time: time.Now().Round(time.Millisecond), DeploymentKey: deploymentKey, RequestKey: optional.Some(requestKey), @@ -87,13 +93,13 @@ func TestTimeline(t *testing.T) { } t.Run("InsertCallEvent", func(t *testing.T) { - call := callEventToCall(callEvent) + call := timeline2.CallEventToCallForTesting(callEvent) timeline.EnqueueEvent(ctx, call) time.Sleep(200 * time.Millisecond) }) - logEvent := &LogEvent{ - Log: Log{ + logEvent := &timeline2.LogEvent{ + Log: timeline2.Log{ Time: time.Now().Round(time.Millisecond), DeploymentKey: deploymentKey, RequestKey: optional.Some(requestKey), @@ -107,7 +113,7 @@ func TestTimeline(t *testing.T) { time.Sleep(200 * time.Millisecond) }) - ingressEvent := &IngressEvent{ + ingressEvent := &timeline2.IngressEvent{ DeploymentKey: deploymentKey, RequestKey: optional.Some(requestKey), Verb: schema.Ref{Module: "echo", Name: "echo"}, @@ -122,7 +128,7 @@ func TestTimeline(t *testing.T) { } t.Run("InsertHTTPIngressEvent", func(t *testing.T) { - timeline.EnqueueEvent(ctx, &Ingress{ + timeline.EnqueueEvent(ctx, &timeline2.Ingress{ DeploymentKey: ingressEvent.DeploymentKey, RequestKey: ingressEvent.RequestKey.MustGet(), StartTime: ingressEvent.Time, @@ -142,8 +148,8 @@ func TestTimeline(t *testing.T) { time.Sleep(200 * time.Millisecond) }) - cronEvent := &CronScheduledEvent{ - CronScheduled: CronScheduled{ + cronEvent := &timeline2.CronScheduledEvent{ + CronScheduled: timeline2.CronScheduled{ DeploymentKey: deploymentKey, Verb: schema.Ref{Module: "time", Name: "time"}, Time: time.Now().Round(time.Millisecond), @@ -154,7 +160,7 @@ func TestTimeline(t *testing.T) { } t.Run("InsertCronScheduledEvent", func(t *testing.T) { - timeline.EnqueueEvent(ctx, &CronScheduled{ + timeline.EnqueueEvent(ctx, &timeline2.CronScheduled{ DeploymentKey: cronEvent.DeploymentKey, Verb: cronEvent.Verb, Time: cronEvent.Time, @@ -165,7 +171,7 @@ func TestTimeline(t *testing.T) { time.Sleep(200 * time.Millisecond) }) - expectedDeploymentUpdatedEvent := &DeploymentUpdatedEvent{ + expectedDeploymentUpdatedEvent := &timeline2.DeploymentUpdatedEvent{ DeploymentKey: deploymentKey, MinReplicas: 1, } @@ -180,48 +186,48 @@ func TestTimeline(t *testing.T) { t.Run("NoFilters", func(t *testing.T) { events, err := timeline.QueryTimeline(ctx, 1000) assert.NoError(t, err) - assertEventsEqual(t, []Event{expectedDeploymentUpdatedEvent, callEvent, logEvent, ingressEvent, cronEvent}, events) + assertEventsEqual(t, []timeline2.Event{expectedDeploymentUpdatedEvent, callEvent, logEvent, ingressEvent, cronEvent}, events) }) t.Run("ByDeployment", func(t *testing.T) { - events, err := timeline.QueryTimeline(ctx, 1000, FilterDeployments(deploymentKey)) + events, err := timeline.QueryTimeline(ctx, 1000, timeline2.FilterDeployments(deploymentKey)) assert.NoError(t, err) - assertEventsEqual(t, []Event{expectedDeploymentUpdatedEvent, callEvent, logEvent, ingressEvent, cronEvent}, events) + assertEventsEqual(t, []timeline2.Event{expectedDeploymentUpdatedEvent, callEvent, logEvent, ingressEvent, cronEvent}, events) }) t.Run("ByCall", func(t *testing.T) { - events, err := timeline.QueryTimeline(ctx, 1000, FilterTypes(EventTypeCall), FilterCall(optional.None[string](), "time", optional.None[string]())) + events, err := timeline.QueryTimeline(ctx, 1000, timeline2.FilterTypes(timeline2.EventTypeCall), timeline2.FilterCall(optional.None[string](), "time", optional.None[string]())) assert.NoError(t, err) - assertEventsEqual(t, []Event{callEvent}, events) + assertEventsEqual(t, []timeline2.Event{callEvent}, events) }) t.Run("ByModule", func(t *testing.T) { - events, err := timeline.QueryTimeline(ctx, 1000, FilterTypes(EventTypeIngress), FilterModule("echo", optional.None[string]())) + events, err := timeline.QueryTimeline(ctx, 1000, timeline2.FilterTypes(timeline2.EventTypeIngress), timeline2.FilterModule("echo", optional.None[string]())) assert.NoError(t, err) - assertEventsEqual(t, []Event{ingressEvent}, events) + assertEventsEqual(t, []timeline2.Event{ingressEvent}, events) }) t.Run("ByModuleWithVerb", func(t *testing.T) { - events, err := timeline.QueryTimeline(ctx, 1000, FilterTypes(EventTypeIngress), FilterModule("echo", optional.Some("echo"))) + events, err := timeline.QueryTimeline(ctx, 1000, timeline2.FilterTypes(timeline2.EventTypeIngress), timeline2.FilterModule("echo", optional.Some("echo"))) assert.NoError(t, err) - assertEventsEqual(t, []Event{ingressEvent}, events) + assertEventsEqual(t, []timeline2.Event{ingressEvent}, events) }) t.Run("ByLogLevel", func(t *testing.T) { - events, err := timeline.QueryTimeline(ctx, 1000, FilterTypes(EventTypeLog), FilterLogLevel(log.Trace)) + events, err := timeline.QueryTimeline(ctx, 1000, timeline2.FilterTypes(timeline2.EventTypeLog), timeline2.FilterLogLevel(log.Trace)) assert.NoError(t, err) - assertEventsEqual(t, []Event{logEvent}, events) + assertEventsEqual(t, []timeline2.Event{logEvent}, events) }) t.Run("ByRequests", func(t *testing.T) { - events, err := timeline.QueryTimeline(ctx, 1000, FilterRequests(requestKey)) + events, err := timeline.QueryTimeline(ctx, 1000, timeline2.FilterRequests(requestKey)) assert.NoError(t, err) - assertEventsEqual(t, []Event{callEvent, logEvent, ingressEvent}, events) + assertEventsEqual(t, []timeline2.Event{callEvent, logEvent, ingressEvent}, events) }) }) } -func normaliseEvents(events []Event) []Event { +func normaliseEvents(events []timeline2.Event) []timeline2.Event { for i := range events { event := events[i] re := reflect.Indirect(reflect.ValueOf(event)) @@ -235,7 +241,7 @@ func normaliseEvents(events []Event) []Event { return events } -func assertEventsEqual(t *testing.T, expected, actual []Event) { +func assertEventsEqual(t *testing.T, expected, actual []timeline2.Event) { t.Helper() assert.Equal(t, normaliseEvents(expected), normaliseEvents(actual), assert.Exclude[time.Duration](), assert.Exclude[time.Time]()) } @@ -246,11 +252,11 @@ func TestDeleteOldEvents(t *testing.T) { encryption, err := encryption.New(ctx, conn, encryption.NewBuilder()) assert.NoError(t, err) - timeline := New(ctx, conn, encryption) + timeline := timeline2.New(ctx, conn, encryption) registry := artefacts.New(conn) scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser()) pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]()) - controllerDAL := controllerdal.New(ctx, conn, encryption, pubSub) + controllerDAL := controllerdal.New(ctx, conn, encryption, pubSub, nil) var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100) var testSha sha256.SHA256 @@ -267,13 +273,13 @@ func TestDeleteOldEvents(t *testing.T) { Digest: testSha, Executable: true, Path: "dir/filename", - }}, nil, nil) + }}, nil) assert.NoError(t, err) }) requestKey := model.NewRequestKey(model.OriginIngress, "GET /test") // week old event - callEvent := &CallEvent{ + callEvent := &timeline2.CallEvent{ Time: time.Now().Add(-24 * 7 * time.Hour).Round(time.Millisecond), DeploymentKey: deploymentKey, RequestKey: optional.Some(requestKey), @@ -283,12 +289,12 @@ func TestDeleteOldEvents(t *testing.T) { } t.Run("InsertCallEvent", func(t *testing.T) { - call := callEventToCall(callEvent) + call := timeline2.CallEventToCallForTesting(callEvent) timeline.EnqueueEvent(ctx, call) time.Sleep(200 * time.Millisecond) }) // hour old event - callEvent = &CallEvent{ + callEvent = &timeline2.CallEvent{ Time: time.Now().Add(-1 * time.Hour).Round(time.Millisecond), DeploymentKey: deploymentKey, RequestKey: optional.Some(requestKey), @@ -297,14 +303,14 @@ func TestDeleteOldEvents(t *testing.T) { DestVerb: schema.Ref{Module: "time", Name: "time"}, } t.Run("InsertCallEvent", func(t *testing.T) { - call := callEventToCall(callEvent) + call := timeline2.CallEventToCallForTesting(callEvent) timeline.EnqueueEvent(ctx, call) time.Sleep(200 * time.Millisecond) }) // week old event - logEvent := &LogEvent{ - Log: Log{ + logEvent := &timeline2.LogEvent{ + Log: timeline2.Log{ Time: time.Now().Add(-24 * 7 * time.Hour).Round(time.Millisecond), DeploymentKey: deploymentKey, RequestKey: optional.Some(requestKey), @@ -319,8 +325,8 @@ func TestDeleteOldEvents(t *testing.T) { }) // hour old event - logEvent = &LogEvent{ - Log: Log{ + logEvent = &timeline2.LogEvent{ + Log: timeline2.Log{ Time: time.Now().Add(-1 * time.Hour).Round(time.Millisecond), DeploymentKey: deploymentKey, RequestKey: optional.Some(requestKey), @@ -335,15 +341,15 @@ func TestDeleteOldEvents(t *testing.T) { }) t.Run("DeleteOldEvents", func(t *testing.T) { - count, err := timeline.DeleteOldEvents(ctx, EventTypeCall, 2*24*time.Hour) + count, err := timeline.DeleteOldEvents(ctx, timeline2.EventTypeCall, 2*24*time.Hour) assert.NoError(t, err) assert.Equal(t, int64(1), count) - count, err = timeline.DeleteOldEvents(ctx, EventTypeLog, time.Minute) + count, err = timeline.DeleteOldEvents(ctx, timeline2.EventTypeLog, time.Minute) assert.NoError(t, err) assert.Equal(t, int64(2), count) - count, err = timeline.DeleteOldEvents(ctx, EventTypeLog, time.Minute) + count, err = timeline.DeleteOldEvents(ctx, timeline2.EventTypeLog, time.Minute) assert.NoError(t, err) assert.Equal(t, int64(0), count) })