Skip to content

Commit

Permalink
feat: timeline events for scheduled cron jobs (#2860)
Browse files Browse the repository at this point in the history
Adds an event to the timeline when a cron job is scheduled. No other
cron events are needed since the rest will be covered by async events.

<img width="1222" alt="Screenshot 2024-10-02 at 10 51 45 AM"
src="https://github.com/user-attachments/assets/3f62f6c2-5429-4c56-9e9f-71b88570e9b9">

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
safeer and github-actions[bot] authored Oct 2, 2024
1 parent 74c415b commit e534f24
Show file tree
Hide file tree
Showing 24 changed files with 1,132 additions and 533 deletions.
22 changes: 22 additions & 0 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timeline.TimelineFilter
eventTypes = append(eventTypes, timeline.EventTypeDeploymentUpdated)
case pbconsole.EventType_EVENT_TYPE_INGRESS:
eventTypes = append(eventTypes, timeline.EventTypeIngress)
case pbconsole.EventType_EVENT_TYPE_CRON_SCHEDULED:
eventTypes = append(eventTypes, timeline.EventTypeCronScheduled)
default:
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown event type %v", eventType))
}
Expand Down Expand Up @@ -492,6 +494,26 @@ func eventDALToProto(event timeline.Event) *pbconsole.Event {
},
}

case *timeline.CronScheduledEvent:
return &pbconsole.Event{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.Event_CronScheduled{
CronScheduled: &pbconsole.CronScheduledEvent{
DeploymentKey: event.DeploymentKey.String(),
VerbRef: &schemapb.Ref{
Module: event.Verb.Module,
Name: event.Verb.Name,
},
TimeStamp: timestamppb.New(event.Time),
Duration: durationpb.New(event.Duration),
ScheduledAt: timestamppb.New(event.ScheduledAt),
Schedule: event.Schedule,
Error: event.Error.Ptr(),
},
},
}

default:
panic(fmt.Errorf("unknown event type %T", event))
}
Expand Down
5 changes: 3 additions & 2 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,6 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
runnerScaling: runnerScaling,
}
svc.schemaState.Store(schemaState{routes: map[string]Route{}, schema: &schema.Schema{}})
cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, conn)
svc.cronJobs = cronSvc

pubSub := pubsub.New(conn, encryption, svc.tasks, optional.Some[pubsub.AsyncCallListener](svc))
svc.pubSub = pubSub
Expand All @@ -284,6 +282,9 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
timelineSvc := timeline.New(ctx, conn, encryption)
svc.timeline = timelineSvc

cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, timelineSvc, conn)
svc.cronJobs = cronSvc

svc.deploymentLogsSink = newDeploymentLogsSink(ctx, timelineSvc)

// Use min, max backoff if we are running in production, otherwise use
Expand Down
61 changes: 45 additions & 16 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
"database/sql"
"errors"
"fmt"
"time"

"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"

"github.com/TBD54566975/ftl/backend/controller/async"
"github.com/TBD54566975/ftl/backend/controller/cronjobs/internal/dal"
encryptionsvc "github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/encryption/api"
"github.com/TBD54566975/ftl/backend/controller/timeline"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
Expand All @@ -20,24 +23,26 @@ import (
)

type Service struct {
key model.ControllerKey
requestSource string
dal dal.DAL
encryption *encryptionsvc.Service
clock clock.Clock
key model.ControllerKey
requestSource string
dal dal.DAL
encryption *encryptionsvc.Service
timelineService *timeline.Service
clock clock.Clock
}

func New(ctx context.Context, key model.ControllerKey, requestSource string, encryption *encryptionsvc.Service, conn *sql.DB) *Service {
return NewForTesting(ctx, key, requestSource, encryption, *dal.New(conn), clock.New())
func New(ctx context.Context, key model.ControllerKey, requestSource string, encryption *encryptionsvc.Service, timeline *timeline.Service, conn *sql.DB) *Service {
return NewForTesting(ctx, key, requestSource, encryption, timeline, *dal.New(conn), clock.New())
}

func NewForTesting(ctx context.Context, key model.ControllerKey, requestSource string, encryption *encryptionsvc.Service, dal dal.DAL, clock clock.Clock) *Service {
func NewForTesting(ctx context.Context, key model.ControllerKey, requestSource string, encryption *encryptionsvc.Service, timeline *timeline.Service, dal dal.DAL, clock clock.Clock) *Service {
svc := &Service{
key: key,
requestSource: requestSource,
dal: dal,
encryption: encryption,
clock: clock,
key: key,
requestSource: requestSource,
dal: dal,
encryption: encryption,
timelineService: timeline,
clock: clock,
}
return svc
}
Expand Down Expand Up @@ -115,8 +120,16 @@ func (s *Service) scheduleCronJobs(ctx context.Context) (err error) {
}
logger.Tracef("Scheduling %d cron jobs", len(jobs))
for _, job := range jobs {
err = s.scheduleCronJob(ctx, tx, job)
err = s.scheduleCronJob(ctx, tx, job, now)
if err != nil {
s.timelineService.EnqueueEvent(ctx, &timeline.CronScheduled{
DeploymentKey: job.DeploymentKey,
Verb: job.Verb,
Time: now,
ScheduledAt: job.NextExecution,
Schedule: job.Schedule,
Error: optional.Some(err.Error()),
})
return fmt.Errorf("failed to schedule cron job %q: %w", job.Key, err)
}
}
Expand All @@ -129,6 +142,7 @@ func (s *Service) scheduleCronJobs(ctx context.Context) (err error) {
func (s *Service) OnJobCompletion(ctx context.Context, key model.CronJobKey, failed bool) (err error) {
logger := log.FromContext(ctx).Scope("cron")
logger.Tracef("Cron job %q completed with failed=%v", key, failed)
now := s.clock.Now().UTC()

tx, err := s.dal.Begin(ctx)
if err != nil {
Expand All @@ -140,15 +154,23 @@ func (s *Service) OnJobCompletion(ctx context.Context, key model.CronJobKey, fai
if err != nil {
return fmt.Errorf("failed to get cron job %q: %w", key, err)
}
err = s.scheduleCronJob(ctx, tx, job)
err = s.scheduleCronJob(ctx, tx, job, now)
if err != nil {
s.timelineService.EnqueueEvent(ctx, &timeline.CronScheduled{
DeploymentKey: job.DeploymentKey,
Verb: job.Verb,
Time: now,
ScheduledAt: job.NextExecution,
Schedule: job.Schedule,
Error: optional.Some(err.Error()),
})
return fmt.Errorf("failed to schedule cron job %q: %w", key, err)
}
return nil
}

// scheduleCronJob schedules the next execution of a single cron job.
func (s *Service) scheduleCronJob(ctx context.Context, tx *dal.DAL, job model.CronJob) error {
func (s *Service) scheduleCronJob(ctx context.Context, tx *dal.DAL, job model.CronJob, startTime time.Time) error {
logger := log.FromContext(ctx).Scope("cron").Module(job.Verb.Module)
now := s.clock.Now().UTC()
pending, err := tx.IsCronJobPending(ctx, job.Key, now)
Expand Down Expand Up @@ -206,5 +228,12 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *dal.DAL, job model.Cr
if err != nil {
return fmt.Errorf("failed to update cron job %q: %w", job.Key, err)
}
s.timelineService.EnqueueEvent(ctx, &timeline.CronScheduled{
DeploymentKey: job.DeploymentKey,
Verb: job.Verb,
Time: startTime,
ScheduledAt: nextAttemptForJob,
Schedule: job.Schedule,
})
return nil
}
5 changes: 4 additions & 1 deletion backend/controller/cronjobs/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/controller/timeline"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
Expand Down Expand Up @@ -57,9 +58,11 @@ func TestNewCronJobsForModule(t *testing.T) {
err = parentDAL.ReplaceDeployment(ctx, deploymentKey, 1)
assert.NoError(t, err)

timelineSrv := timeline.New(ctx, conn, encryption)

// Progress so that start_time is valid
clk.Add(time.Second)
cjs := NewForTesting(ctx, key, "test.com", encryption, *dal, clk)
cjs := NewForTesting(ctx, key, "test.com", encryption, timelineSrv, *dal, clk)
// All jobs need to be scheduled
expectUnscheduledJobs(t, dal, clk, 2)
unscheduledJobs, err := dal.GetUnscheduledCronJobs(ctx, clk.Now())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- migrate:up

ALTER TYPE event_type ADD VALUE IF NOT EXISTS 'cron_scheduled';

-- migrate:down

76 changes: 76 additions & 0 deletions backend/controller/timeline/events_cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package timeline

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/alecthomas/types/optional"

ftlencryption "github.com/TBD54566975/ftl/backend/controller/encryption/api"
"github.com/TBD54566975/ftl/backend/controller/timeline/internal/sql"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/schema"
)

type CronScheduledEvent struct {
ID int64
Duration time.Duration
CronScheduled
}

func (e *CronScheduledEvent) GetID() int64 { return e.ID }
func (e *CronScheduledEvent) event() {}

type CronScheduled struct {
DeploymentKey model.DeploymentKey
Verb schema.Ref

Time time.Time
ScheduledAt time.Time
Schedule string
Error optional.Option[string]
}

func (*CronScheduled) inEvent() {}

type eventCronScheduledJSON struct {
DurationMS int64 `json:"duration_ms"`
ScheduledAt time.Time `json:"scheduled_at"`
Schedule string `json:"schedule"`
Error optional.Option[string] `json:"error,omitempty"`
}

func (s *Service) insertCronScheduledEvent(ctx context.Context, querier sql.Querier, event *CronScheduled) error {
cronJSON := eventCronScheduledJSON{
DurationMS: time.Since(event.Time).Milliseconds(),
ScheduledAt: event.ScheduledAt,
Schedule: event.Schedule,
Error: event.Error,
}

data, err := json.Marshal(cronJSON)
if err != nil {
return fmt.Errorf("failed to marshal cron JSON: %w", err)
}

var payload ftlencryption.EncryptedTimelineColumn
err = s.encryption.EncryptJSON(json.RawMessage(data), &payload)
if err != nil {
return fmt.Errorf("failed to encrypt cron JSON: %w", err)
}

err = libdal.TranslatePGError(querier.InsertTimelineCronScheduledEvent(ctx, sql.InsertTimelineCronScheduledEventParams{
DeploymentKey: event.DeploymentKey,
TimeStamp: event.Time,
Module: event.Verb.Module,
Verb: event.Verb.Name,
Payload: payload,
}))
if err != nil {
return fmt.Errorf("failed to insert cron event: %w", err)
}
return err
}
1 change: 1 addition & 0 deletions backend/controller/timeline/internal/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/controller/timeline/internal/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions backend/controller/timeline/internal/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,24 @@ VALUES (
sqlc.arg('payload')
);

-- name: InsertTimelineCronScheduledEvent :exec
INSERT INTO timeline (
deployment_id,
time_stamp,
type,
custom_key_1,
custom_key_2,
payload
)
VALUES (
(SELECT id FROM deployments d WHERE d.key = sqlc.arg('deployment_key')::deployment_key LIMIT 1),
sqlc.arg('time_stamp')::TIMESTAMPTZ,
'cron_scheduled',
sqlc.arg('module')::TEXT,
sqlc.arg('verb')::TEXT,
sqlc.arg('payload')
);

-- name: DeleteOldTimelineEvents :one
WITH deleted AS (
DELETE FROM timeline
Expand Down
38 changes: 38 additions & 0 deletions backend/controller/timeline/internal/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions backend/controller/timeline/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,23 @@ func (s *Service) transformRowsToTimelineEvents(deploymentKeys map[int64]model.D
ResponseHeader: jsonPayload.ResponseHeader,
Error: jsonPayload.Error,
})
case sql.EventTypeCronScheduled:
var jsonPayload eventCronScheduledJSON
if err := s.encryption.DecryptJSON(&row.Payload, &jsonPayload); err != nil {
return nil, fmt.Errorf("failed to decrypt cron scheduled event: %w", err)
}
out = append(out, &CronScheduledEvent{
ID: row.ID,
Duration: time.Duration(jsonPayload.DurationMS) * time.Millisecond,
CronScheduled: CronScheduled{
DeploymentKey: row.DeploymentKey,
Verb: schema.Ref{Module: row.CustomKey1.MustGet(), Name: row.CustomKey2.MustGet()},
Time: row.TimeStamp,
ScheduledAt: jsonPayload.ScheduledAt,
Schedule: jsonPayload.Schedule,
Error: jsonPayload.Error,
},
})

default:
panic("unknown event type: " + row.Type)
Expand Down
Loading

0 comments on commit e534f24

Please sign in to comment.