diff --git a/cmd/metricsreporter/wire.go b/cmd/metricsreporter/wire.go index 46291485d..56b7f528a 100644 --- a/cmd/metricsreporter/wire.go +++ b/cmd/metricsreporter/wire.go @@ -50,8 +50,9 @@ func provideMetricsReporterConfig(c config.Config) *workerconfigs.MetricsReporte var WorkerOptionsSet = wire.NewSet( service.NewRoomStorageRedis, service.NewGameRoomInstanceStorageRedis, + service.NewSchedulerCacheRedis, provideMetricsReporterConfig, - wire.Struct(new(worker.WorkerOptions), "RoomStorage", "InstanceStorage", "MetricsReporterConfig")) + wire.Struct(new(worker.WorkerOptions), "RoomStorage", "InstanceStorage", "SchedulerCache", "MetricsReporterConfig")) func initializeMetricsReporter(c config.Config) (*workers.WorkersManager, error) { wire.Build( diff --git a/cmd/metricsreporter/wire_gen.go b/cmd/metricsreporter/wire_gen.go index 84d8b7021..ac889d66f 100644 --- a/cmd/metricsreporter/wire_gen.go +++ b/cmd/metricsreporter/wire_gen.go @@ -32,10 +32,15 @@ func initializeMetricsReporter(c config.Config) (*workers.WorkersManager, error) if err != nil { return nil, err } + schedulerCache, err := service.NewSchedulerCacheRedis(c) + if err != nil { + return nil, err + } metricsReporterConfig := provideMetricsReporterConfig(c) workerOptions := &worker.WorkerOptions{ RoomStorage: roomStorage, InstanceStorage: gameRoomInstanceStorage, + SchedulerCache: schedulerCache, MetricsReporterConfig: metricsReporterConfig, } workersManager := workers.NewWorkersManager(workerBuilder, c, schedulerStorage, workerOptions) @@ -56,4 +61,4 @@ func provideMetricsReporterConfig(c config.Config) *config2.MetricsReporterConfi } -var WorkerOptionsSet = wire.NewSet(service.NewRoomStorageRedis, service.NewGameRoomInstanceStorageRedis, provideMetricsReporterConfig, wire.Struct(new(worker.WorkerOptions), "RoomStorage", "InstanceStorage", "MetricsReporterConfig")) +var WorkerOptionsSet = wire.NewSet(service.NewRoomStorageRedis, service.NewGameRoomInstanceStorageRedis, service.NewSchedulerCacheRedis, provideMetricsReporterConfig, wire.Struct(new(worker.WorkerOptions), "RoomStorage", "InstanceStorage", "SchedulerCache", "MetricsReporterConfig")) diff --git a/internal/core/worker/metricsreporter/metrics_reporter_worker.go b/internal/core/worker/metricsreporter/metrics_reporter_worker.go index 17dc0e697..187217fe1 100644 --- a/internal/core/worker/metricsreporter/metrics_reporter_worker.go +++ b/internal/core/worker/metricsreporter/metrics_reporter_worker.go @@ -45,6 +45,7 @@ const WorkerName = "metrics_reporter" // MetricsReporterWorker is the service responsible producing periodic metrics. type MetricsReporterWorker struct { scheduler *entities.Scheduler + schedulerCache ports.SchedulerCache config *config.MetricsReporterConfig roomStorage ports.RoomStorage instanceStorage ports.GameRoomInstanceStorage @@ -56,6 +57,7 @@ type MetricsReporterWorker struct { func NewMetricsReporterWorker(scheduler *entities.Scheduler, opts *worker.WorkerOptions) worker.Worker { return &MetricsReporterWorker{ scheduler: scheduler, + schedulerCache: opts.SchedulerCache, config: opts.MetricsReporterConfig, roomStorage: opts.RoomStorage, instanceStorage: opts.InstanceStorage, @@ -77,6 +79,7 @@ func (w *MetricsReporterWorker) Start(ctx context.Context) error { w.Stop(w.workerContext) return nil case <-ticker.C: + w.syncScheduler(w.workerContext) w.reportInstanceMetrics() w.reportGameRoomMetrics() w.reportSchedulerMetrics() @@ -96,6 +99,18 @@ func (w *MetricsReporterWorker) IsRunning() bool { return w.workerContext != nil && w.workerContext.Err() == nil } +func (w *MetricsReporterWorker) syncScheduler(ctx context.Context) { + scheduler, err := w.schedulerCache.GetScheduler(ctx, w.scheduler.Name) + if err != nil { + w.logger.Error("Error loading scheduler", zap.Error(err)) + return + } + + if w.scheduler.Spec.Version != scheduler.Spec.Version { + w.scheduler = scheduler + } +} + func (w *MetricsReporterWorker) reportInstanceMetrics() { w.logger.Info("Reporting instance metrics") diff --git a/internal/core/worker/metricsreporter/metrics_reporter_worker_test.go b/internal/core/worker/metricsreporter/metrics_reporter_worker_test.go index a70a7359b..0368ca64e 100644 --- a/internal/core/worker/metricsreporter/metrics_reporter_worker_test.go +++ b/internal/core/worker/metricsreporter/metrics_reporter_worker_test.go @@ -50,6 +50,7 @@ func TestMetricsReporterWorker_StartProduceMetrics(t *testing.T) { mockCtl := gomock.NewController(t) roomStorage := mock.NewMockRoomStorage(mockCtl) instanceStorage := mock.NewMockGameRoomInstanceStorage(mockCtl) + schedulerCache := mock.NewMockSchedulerCache(mockCtl) ctx, cancelFunc := context.WithCancel(context.Background()) scheduler := &entities.Scheduler{Name: "random-scheduler"} instances := newInstancesList(40) @@ -58,6 +59,7 @@ func TestMetricsReporterWorker_StartProduceMetrics(t *testing.T) { RoomStorage: roomStorage, InstanceStorage: instanceStorage, MetricsReporterConfig: &config.MetricsReporterConfig{MetricsReporterIntervalMillis: 500}, + SchedulerCache: schedulerCache, } worker := NewMetricsReporterWorker(scheduler, workerOpts) @@ -79,6 +81,7 @@ func TestMetricsReporterWorker_StartProduceMetrics(t *testing.T) { roomStorage.EXPECT().GetRunningMatchesCount(gomock.Any(), scheduler.Name). Return(88, nil).MinTimes(3) instanceStorage.EXPECT().GetAllInstances(gomock.Any(), scheduler.Name).Return(instances, nil).MinTimes(3) + schedulerCache.EXPECT().GetScheduler(gomock.Any(), scheduler.Name).Return(scheduler, nil).MinTimes(3) go func() { err := worker.Start(ctx) @@ -116,6 +119,7 @@ func TestMetricsReporterWorker_StartDoNotProduceMetrics(t *testing.T) { mockCtl := gomock.NewController(t) roomStorage := mock.NewMockRoomStorage(mockCtl) instanceStorage := mock.NewMockGameRoomInstanceStorage(mockCtl) + schedulerCache := mock.NewMockSchedulerCache(mockCtl) ctx, cancelFunc := context.WithCancel(context.Background()) scheduler := &entities.Scheduler{Name: "random-scheduler-2"} @@ -124,6 +128,7 @@ func TestMetricsReporterWorker_StartDoNotProduceMetrics(t *testing.T) { RoomStorage: roomStorage, InstanceStorage: instanceStorage, MetricsReporterConfig: &config.MetricsReporterConfig{MetricsReporterIntervalMillis: 500}, + SchedulerCache: schedulerCache, } worker := NewMetricsReporterWorker(scheduler, workerOpts) @@ -145,6 +150,8 @@ func TestMetricsReporterWorker_StartDoNotProduceMetrics(t *testing.T) { Return(0, errors.New("some_error")).MinTimes(3) instanceStorage.EXPECT().GetAllInstances(gomock.Any(), scheduler.Name). Return([]*game_room.Instance{}, errors.New("some_error")).MinTimes(3) + schedulerCache.EXPECT().GetScheduler(gomock.Any(), scheduler.Name). + Return(scheduler, errors.New("some_error")).MinTimes(3) go func() { err := worker.Start(ctx) diff --git a/internal/core/worker/worker.go b/internal/core/worker/worker.go index 2f25dd533..33259ff89 100644 --- a/internal/core/worker/worker.go +++ b/internal/core/worker/worker.go @@ -56,6 +56,7 @@ type WorkerOptions struct { InstanceStorage ports.GameRoomInstanceStorage MetricsReporterConfig *config.MetricsReporterConfig RuntimeWatcherConfig *config.RuntimeWatcherConfig + SchedulerCache ports.SchedulerCache } // Configuration holds all worker configuration parameters.