Skip to content

Commit

Permalink
fix: cron removed on module change
Browse files Browse the repository at this point in the history
fixes #3503
  • Loading branch information
stuartwdouglas committed Nov 25, 2024
1 parent 755c2a5 commit f862a35
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions backend/cron/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func run(ctx context.Context, verbClient CallClient, changes chan *ftlv1.PullSch
logger := log.FromContext(ctx).Scope("cron")
// Map of cron jobs for each module.
cronJobs := map[string][]cronJob{}
currentDeploymentForModule := map[string]string{}
// Cron jobs ordered by next execution.
cronQueue := []cronJob{}

Expand All @@ -87,15 +88,17 @@ func run(ctx context.Context, verbClient CallClient, changes chan *ftlv1.PullSch
next, ok := scheduleNext(cronQueue)
var nextCh <-chan time.Time
if ok {
logger.Tracef("Next cron job scheduled in %s", next)
logger.Debugf("Next cron job scheduled in %s", next)
nextCh = time.After(next)
} else {
logger.Debugf("No cron jobs scheduled")
}
select {
case <-ctx.Done():
return fmt.Errorf("cron service stopped: %w", ctx.Err())

case resp := <-changes:
if err := updateCronJobs(cronJobs, resp); err != nil {
if err := updateCronJobs(ctx, cronJobs, currentDeploymentForModule, resp); err != nil {
logger.Errorf(err, "Failed to update cron jobs")
continue
}
Expand Down Expand Up @@ -162,12 +165,20 @@ func scheduleNext(cronQueue []cronJob) (time.Duration, bool) {
return time.Until(cronQueue[0].next), true
}

func updateCronJobs(cronJobs map[string][]cronJob, resp *ftlv1.PullSchemaResponse) error {
func updateCronJobs(ctx context.Context, cronJobs map[string][]cronJob, currentDeploymentsForModule map[string]string, resp *ftlv1.PullSchemaResponse) error {
logger := log.FromContext(ctx).Scope("cron")
switch resp.ChangeType {
case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED:
if currentDeploymentsForModule[resp.ModuleName] != resp.DeploymentKey {
logger.Debugf("Not removing cron jobs for %s as it is outdated", resp.ModuleName)
return nil
}
logger.Debugf("Removing cron jobs for module %s", resp.ModuleName)
delete(cronJobs, resp.ModuleName)

case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED:
logger.Debugf("Updated cron jobs for module %s", resp.ModuleName)
currentDeploymentsForModule[resp.ModuleName] = resp.DeploymentKey
moduleSchema, err := schema.ModuleFromProto(resp.Schema)
if err != nil {
return fmt.Errorf("failed to extract module schema: %w", err)
Expand All @@ -176,6 +187,7 @@ func updateCronJobs(cronJobs map[string][]cronJob, resp *ftlv1.PullSchemaRespons
if err != nil {
return fmt.Errorf("failed to extract cron jobs: %w", err)
}
logger.Debugf("Adding %d cron jobs for module %s", len(moduleJobs), resp.ModuleName)
cronJobs[resp.ModuleName] = moduleJobs
}
return nil
Expand Down

0 comments on commit f862a35

Please sign in to comment.