diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 76e9efebc..647499d22 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -15,6 +15,7 @@ import ( "time" "connectrpc.com/connect" + "github.com/alecthomas/atomic" "github.com/alecthomas/concurrency" "github.com/alecthomas/kong" "github.com/alecthomas/types/optional" @@ -164,8 +165,7 @@ type Service struct { // Map from endpoint to client. clients *ttlcache.Cache[string, clients] - routesMu sync.RWMutex - routes map[string][]dal.Route + routes atomic.Value[map[string][]dal.Route] config Config runnerScaling scaling.RunnerScaling @@ -192,11 +192,11 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. key: key, deploymentLogsSink: newDeploymentLogsSink(ctx, db), clients: ttlcache.New[string, clients](ttlcache.WithTTL[string, clients](time.Minute)), - routes: map[string][]dal.Route{}, config: config, runnerScaling: runnerScaling, increaseReplicaFailures: map[string]int{}, } + svc.routes.Store(map[string][]dal.Route{}) cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc.callWithRequest) svc.cronJobs = cronSvc @@ -303,8 +303,8 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR if err != nil { return nil, fmt.Errorf("could not get status: %w", err) } - s.routesMu.RLock() - routes := slices.FlatMap(maps.Values(s.routes), func(routes []dal.Route) (out []*ftlv1.StatusResponse_Route) { + sroutes := s.routes.Load() + routes := slices.FlatMap(maps.Values(sroutes), func(routes []dal.Route) (out []*ftlv1.StatusResponse_Route) { out = make([]*ftlv1.StatusResponse_Route, len(routes)) for i, route := range routes { out[i] = &ftlv1.StatusResponse_Route{ @@ -316,7 +316,6 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR } return out }) - s.routesMu.RUnlock() replicas := map[string]int32{} protoRunners, err := slices.MapErr(status.Runners, func(r dal.Runner) (*ftlv1.StatusResponse_Runner, error) { var deployment *string @@ -543,9 +542,7 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre } else if err != nil { return nil, err } - s.routesMu.Lock() - s.routes = routes - s.routesMu.Unlock() + s.routes.Store(routes) } if stream.Err() != nil { return nil, stream.Err() @@ -743,9 +740,7 @@ func (s *Service) callWithRequest( } module := verbRef.Module - s.routesMu.RLock() - routes, ok := s.routes[module] - s.routesMu.RUnlock() + routes, ok := s.routes.Load()[module] if !ok { return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no routes for module %q", module)) } @@ -1342,9 +1337,7 @@ func (s *Service) syncRoutes(ctx context.Context) (time.Duration, error) { } else if err != nil { return 0, err } - s.routesMu.Lock() - s.routes = routes - s.routesMu.Unlock() + s.routes.Store(routes) return time.Second, nil }