Skip to content

Commit

Permalink
fix cron sql getter, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
safeer committed Aug 21, 2024
1 parent c196bb6 commit d2abe24
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 16 deletions.
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ init-db:

# Regenerate SQLC code (requires init-db to be run first)
build-sqlc:
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} backend/controller/cronjobs/sql/{db.go,models.go,querier.go,queries.sql.go} common/configuration/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/sql/async_queries.sql backend/controller/cronjobs/sql/queries.sql common/configuration/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate"
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} backend/controller/cronjobs/sql/{db.go,models.go,querier.go,queries.sql.go} internal/configuration/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/sql/async_queries.sql backend/controller/cronjobs/sql/queries.sql internal/configuration/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate"

# Build the ZIP files that are embedded in the FTL release binaries
build-zips: build-kt-runtime
Expand Down
3 changes: 3 additions & 0 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,9 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
if returnErr == nil {
// Post-commit notification based on origin
switch origin := call.Origin.(type) {
case dal.AsyncOriginCron:
break

case dal.AsyncOriginFSM:
break

Expand Down
14 changes: 6 additions & 8 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"database/sql"
"errors"
"fmt"
"time"

"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"
Expand Down Expand Up @@ -88,37 +87,36 @@ func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Mod
func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context) {
logger := log.FromContext(ctx).Scope("cron")
logger.Tracef("New deployment; scheduling cron jobs")
_, err := s.scheduleCronJobs(ctx)
err := s.scheduleCronJobs(ctx)
if err != nil {
logger.Errorf(err, "failed to schedule cron jobs: %v", err)
}
}

// scheduleCronJobs schedules all cron jobs that are not already scheduled.
func (s *Service) scheduleCronJobs(ctx context.Context) (dur time.Duration, err error) {
func (s *Service) scheduleCronJobs(ctx context.Context) (err error) {
logger := log.FromContext(ctx).Scope("cron")
now := s.clock.Now().UTC()
next := now.Add(time.Hour)

tx, err := s.dal.Begin(ctx)
if err != nil {
return 0, fmt.Errorf("failed to begin transaction: %w", err)
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.CommitOrRollback(ctx, &err)

jobs, err := tx.GetUnscheduledCronJobs(ctx, now)
if err != nil {
return 0, fmt.Errorf("failed to get unscheduled cron jobs: %w", err)
return fmt.Errorf("failed to get unscheduled cron jobs: %w", err)
}
logger.Tracef("Scheduling %d cron jobs", len(jobs))
for _, job := range jobs {
err = s.scheduleCronJob(ctx, tx, job)
if err != nil {
return 0, fmt.Errorf("failed to schedule cron job %q: %w", job.Key, err)
return fmt.Errorf("failed to schedule cron job %q: %w", job.Key, err)
}
}

return next.Sub(now), nil
return nil
}

// OnJobCompletion is called by the controller when a cron job async call completes. We schedule
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/cronjobs/cronjobs_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
db "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal"
parentdb "github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
in "github.com/TBD54566975/ftl/internal/integration"
in "github.com/TBD54566975/ftl/integration"
"github.com/TBD54566975/ftl/internal/log"
)

Expand Down
170 changes: 170 additions & 0 deletions backend/controller/cronjobs/cronjobs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package cronjobs

import (
"context"
"fmt"
"strconv"
"testing"
"time"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/either"
"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"

"github.com/TBD54566975/ftl/backend/controller/cronjobs/sql"
parentdal "github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
dalerrs "github.com/TBD54566975/ftl/backend/dal"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
)

func TestNewCronJobsForModule(t *testing.T) {
t.Parallel()
ctx := log.ContextWithNewDefaultLogger(context.Background())
ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)

clk := clock.NewMock()
clk.Add(time.Second) // half way between cron job executions

key := model.NewControllerKey("localhost", strconv.Itoa(8080+1))
conn := sqltest.OpenForTesting(ctx, t)
dal := &DAL{db: sql.NewDB(conn)}

parentDAL, err := parentdal.New(ctx, conn, optional.None[string]())
assert.NoError(t, err)

moduleName := "initial"
jobsToCreate := newCronJobs(t, moduleName, "* * * * * *", clk, 2) // every minute

deploymentKey, err := parentDAL.CreateDeployment(ctx, "go", &schema.Module{
Name: moduleName,
}, []parentdal.DeploymentArtefact{}, []parentdal.IngressRoutingEntry{}, jobsToCreate)
assert.NoError(t, err)

err = parentDAL.ReplaceDeployment(ctx, deploymentKey, 1)
assert.NoError(t, err)

// Progress so that start_time is valid
clk.Add(time.Second)
cjs := NewForTesting(ctx, key, "test.com", *dal, clk)

// All jobs need to be scheduled
expectUnscheduledJobs(t, dal, clk, 2)
unscheduledJobs, err := dal.GetUnscheduledCronJobs(ctx, clk.Now())
assert.NoError(t, err)
assert.Equal(t, len(unscheduledJobs), 2)

// No async calls yet
_, err = parentDAL.AcquireAsyncCall(ctx)
assert.IsError(t, err, dalerrs.ErrNotFound)
assert.EqualError(t, err, "no pending async calls: not found")

err = cjs.scheduleCronJobs(ctx)
assert.NoError(t, err)

expectUnscheduledJobs(t, dal, clk, 0)

for _, job := range jobsToCreate {
j, err := dal.GetCronJobByKey(ctx, job.Key)
assert.NoError(t, err)
assert.Equal(t, job.StartTime, j.StartTime)
assert.Equal(t, j.NextExecution, clk.Now().Add(time.Second))

p, err := dal.db.IsCronJobPending(ctx, job.Key, job.StartTime)
assert.NoError(t, err)
assert.True(t, p)
}

// Now there should be async calls
calls := []*parentdal.AsyncCall{}
for i, job := range jobsToCreate {
call, err := parentDAL.AcquireAsyncCall(ctx)
assert.NoError(t, err)
assert.Equal(t, call.Verb, job.Verb.ToRefKey())
assert.Equal(t, call.Origin.String(), fmt.Sprintf("cron:%s", job.Key))
assert.Equal(t, call.Request, []byte("{}"))
assert.Equal(t, call.QueueDepth, int64(len(jobsToCreate)-i)) // widdling down queue

p, err := dal.db.IsCronJobPending(ctx, job.Key, job.StartTime)
assert.NoError(t, err)
assert.False(t, p)

calls = append(calls, call)
}

clk.Add(time.Second)

expectUnscheduledJobs(t, dal, clk, 0)

// Complete all calls
for _, call := range calls {
callResult := either.LeftOf[string]([]byte("{}"))
_, err = parentDAL.CompleteAsyncCall(ctx, call, callResult, func(tx *parentdal.Tx, isFinalResult bool) error {
return nil
})
assert.NoError(t, err)
}

clk.Add(time.Second)

expectUnscheduledJobs(t, dal, clk, 2)

// Use the completion handler to schedule the next execution
for _, call := range calls {
origin, ok := call.Origin.(parentdal.AsyncOriginCron)
assert.True(t, ok)
cjk, err := model.ParseCronJobKey(origin.CronJobKey)
assert.NoError(t, err)
err = cjs.OnJobCompletion(ctx, cjk, false)
assert.NoError(t, err)
}

expectUnscheduledJobs(t, dal, clk, 0)

for i, job := range jobsToCreate {
call, err := parentDAL.AcquireAsyncCall(ctx)
assert.NoError(t, err)
assert.Equal(t, call.Verb, job.Verb.ToRefKey())
assert.Equal(t, call.Origin.String(), fmt.Sprintf("cron:%s", job.Key))
assert.Equal(t, call.Request, []byte("{}"))
assert.Equal(t, call.QueueDepth, int64(len(jobsToCreate)-i)) // widdling down queue

assert.Equal(t, call.ScheduledAt, clk.Now())

p, err := dal.db.IsCronJobPending(ctx, job.Key, job.StartTime)
assert.NoError(t, err)
assert.False(t, p)
}
}

func expectUnscheduledJobs(t *testing.T, dal *DAL, clk *clock.Mock, count int) {
t.Helper()
unscheduledJobs, err := dal.GetUnscheduledCronJobs(context.Background(), clk.Now())
assert.NoError(t, err)
assert.Equal(t, len(unscheduledJobs), count)
}

func newCronJobs(t *testing.T, moduleName string, cronPattern string, clock clock.Clock, count int) []model.CronJob {
t.Helper()
newJobs := []model.CronJob{}
for i := range count {
now := clock.Now()
pattern, err := cron.Parse(cronPattern)
assert.NoError(t, err)
next, err := cron.NextAfter(pattern, now, false)
assert.NoError(t, err)
newJobs = append(newJobs, model.CronJob{
Key: model.NewCronJobKey(moduleName, fmt.Sprintf("verb%dCron", i)),
Verb: schema.Ref{Module: moduleName, Name: fmt.Sprintf("verb%dCron", i)},
Schedule: pattern.String(),
StartTime: now,
NextExecution: next,
})
}
return newJobs
}
6 changes: 3 additions & 3 deletions backend/controller/cronjobs/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func cronJobFromRow(c sql.CronJob, d sql.Deployment) model.CronJob {
}
}

// GetUnscheduledCronJobs returns all cron jobs with start_time before provided startTime for
// deployments with min replicas > 0 with no async calls after last_execution
// GetUnscheduledCronJobs returns all cron_jobs rows with start_time before provided startTime for
// deployments with min replicas > 0 with no pending corresponding async_calls after last_execution
func (d *DAL) GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]model.CronJob, error) {
rows, err := d.db.GetUnscheduledCronJobs(ctx, startTime)
if err != nil {
Expand All @@ -88,7 +88,7 @@ func (d *DAL) GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) (
}), nil
}

// GetCronJobByKey returns a cron job by its key
// GetCronJobByKey returns a cron_job row by its key
func (d *DAL) GetCronJobByKey(ctx context.Context, key model.CronJobKey) (model.CronJob, error) {
row, err := d.db.GetCronJobByKey(ctx, key)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion backend/controller/cronjobs/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ WHERE d.min_replicas > 0
FROM async_calls ac
WHERE
ac.cron_job_key = j.key
AND ac.scheduled_at > j.last_execution::TIMESTAMPTZ
AND (
ac.scheduled_at > j.last_execution OR
(ac.scheduled_at = j.last_execution AND ac.state IN ('pending', 'executing'))
)
)
)
FOR UPDATE SKIP LOCKED;
Expand Down
5 changes: 4 additions & 1 deletion backend/controller/cronjobs/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d2abe24

Please sign in to comment.