Skip to content

Commit

Permalink
fix: cron removed on module change
Browse files Browse the repository at this point in the history
fixes #3503
  • Loading branch information
stuartwdouglas committed Nov 25, 2024
1 parent 4a83ccd commit 13136c3
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 9 deletions.
21 changes: 18 additions & 3 deletions backend/cron/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func run(ctx context.Context, verbClient CallClient, changes chan *ftlv1.PullSch
logger := log.FromContext(ctx).Scope("cron")
// Map of cron jobs for each module.
cronJobs := map[string][]cronJob{}
currentDeploymentForModule := map[string]string{}
// Cron jobs ordered by next execution.
cronQueue := []cronJob{}

Expand All @@ -87,15 +88,17 @@ func run(ctx context.Context, verbClient CallClient, changes chan *ftlv1.PullSch
next, ok := scheduleNext(cronQueue)
var nextCh <-chan time.Time
if ok {
logger.Tracef("Next cron job scheduled in %s", next)
logger.Debugf("Next cron job scheduled in %s", next)
nextCh = time.After(next)
} else {
logger.Debugf("No cron jobs scheduled")
}
select {
case <-ctx.Done():
return fmt.Errorf("cron service stopped: %w", ctx.Err())

case resp := <-changes:
if err := updateCronJobs(cronJobs, resp); err != nil {
if err := updateCronJobs(ctx, cronJobs, currentDeploymentForModule, resp); err != nil {
logger.Errorf(err, "Failed to update cron jobs")
continue
}
Expand Down Expand Up @@ -162,12 +165,23 @@ func scheduleNext(cronQueue []cronJob) (time.Duration, bool) {
return time.Until(cronQueue[0].next), true
}

func updateCronJobs(cronJobs map[string][]cronJob, resp *ftlv1.PullSchemaResponse) error {
func updateCronJobs(ctx context.Context, cronJobs map[string][]cronJob, currentDeploymentsForModule map[string]string, resp *ftlv1.PullSchemaResponse) error {
logger := log.FromContext(ctx).Scope("cron")
switch resp.ChangeType {
case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED:
// We see the new state of the module before we see the removed deployment.
// We only want to actually remove if it was not replaced by a new deployment.
if currentDeploymentsForModule[resp.ModuleName] != resp.DeploymentKey {
logger.Debugf("Not removing cron jobs for %s as it is outdated", resp.DeploymentKey)
return nil
}
logger.Debugf("Removing cron jobs for module %s", resp.ModuleName)
delete(currentDeploymentsForModule, resp.ModuleName)
delete(cronJobs, resp.ModuleName)

case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED:
logger.Debugf("Updated cron jobs for module %s", resp.ModuleName)
currentDeploymentsForModule[resp.ModuleName] = resp.DeploymentKey
moduleSchema, err := schema.ModuleFromProto(resp.Schema)
if err != nil {
return fmt.Errorf("failed to extract module schema: %w", err)
Expand All @@ -176,6 +190,7 @@ func updateCronJobs(cronJobs map[string][]cronJob, resp *ftlv1.PullSchemaRespons
if err != nil {
return fmt.Errorf("failed to extract cron jobs: %w", err)
}
logger.Debugf("Adding %d cron jobs for module %s", len(moduleJobs), resp.ModuleName)
cronJobs[resp.ModuleName] = moduleJobs
}
return nil
Expand Down
27 changes: 21 additions & 6 deletions backend/ingress/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ func Start(ctx context.Context, config Config, pullSchemaClient PullSchemaClient
svc := &service{
callClient: verbClient,
}
svc.schemaState.Store(schemaState{
currentDeploymentsForModule: make(map[string]string),
})

ingressHandler := otelhttp.NewHandler(http.Handler(svc), "ftl.ingress")
if len(config.AllowOrigins) > 0 {
Expand All @@ -81,8 +84,12 @@ func Start(ctx context.Context, config Config, pullSchemaClient PullSchemaClient
rpc.RetryStreamingServerStream(ctx, "pull-schema", backoff.Backoff{}, &ftlv1.PullSchemaRequest{}, pullSchemaClient.PullSchema, func(ctx context.Context, resp *ftlv1.PullSchemaResponse) error {
existing := svc.schemaState.Load().protoSchema
newState := schemaState{
protoSchema: &schemapb.Schema{},
httpRoutes: make(map[string][]ingressRoute),
protoSchema: &schemapb.Schema{},
httpRoutes: make(map[string][]ingressRoute),
currentDeploymentsForModule: make(map[string]string),
}
for k, v := range svc.schemaState.Load().currentDeploymentsForModule {
newState.currentDeploymentsForModule[k] = v
}
if resp.ChangeType != ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED {
found := false
Expand All @@ -100,13 +107,20 @@ func Start(ctx context.Context, config Config, pullSchemaClient PullSchemaClient
newState.protoSchema.Modules = append(newState.protoSchema.Modules, resp.Schema)
}
} else if existing != nil {
// We see the new state of the module before we see the removed deployment.
// We only want to actually remove if it was not replaced by a new deployment.
if newState.currentDeploymentsForModule[resp.ModuleName] != resp.DeploymentKey {
logger.Debugf("Not removing ingress for %s as it is outdated", resp.DeploymentKey)
return nil
}
delete(newState.currentDeploymentsForModule, resp.ModuleName)
for i := range existing.Modules {
if existing.Modules[i].Name != resp.ModuleName {
newState.protoSchema.Modules = append(newState.protoSchema.Modules, existing.Modules[i])
}
}
}

newState.currentDeploymentsForModule[resp.ModuleName] = resp.DeploymentKey
newState.httpRoutes = extractIngressRoutingEntries(newState.protoSchema)
sch, err := schema.FromProto(newState.protoSchema)
if err != nil {
Expand Down Expand Up @@ -146,9 +160,10 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

type schemaState struct {
protoSchema *schemapb.Schema
schema *schema.Schema
httpRoutes map[string][]ingressRoute
protoSchema *schemapb.Schema
schema *schema.Schema
httpRoutes map[string][]ingressRoute
currentDeploymentsForModule map[string]string
}

type ingressRoute struct {
Expand Down

0 comments on commit 13136c3

Please sign in to comment.