From aec59f84caed1975b68c5ed8eb431cfbbd0a2365 Mon Sep 17 00:00:00 2001 From: GuilhermBrSp Date: Fri, 17 Jun 2022 09:47:37 -0300 Subject: [PATCH] Use informer indexer for retrieve workqueue objects instead of custom storage. --- .../runtime/kubernetes/game_room_watcher.go | 62 ++++++++++--------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/internal/adapters/runtime/kubernetes/game_room_watcher.go b/internal/adapters/runtime/kubernetes/game_room_watcher.go index c386a8ba4..bcf925f1c 100644 --- a/internal/adapters/runtime/kubernetes/game_room_watcher.go +++ b/internal/adapters/runtime/kubernetes/game_room_watcher.go @@ -27,17 +27,17 @@ import ( "sync" "time" + "github.com/topfreegames/maestro/internal/core/entities" + "github.com/topfreegames/maestro/internal/core/ports" + "go.uber.org/zap" + "k8s.io/client-go/util/workqueue" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/topfreegames/maestro/internal/core/logs" - "go.uber.org/zap" - - "github.com/topfreegames/maestro/internal/core/entities" "github.com/topfreegames/maestro/internal/core/entities/game_room" - "github.com/topfreegames/maestro/internal/core/ports" v1 "k8s.io/api/core/v1" "k8s.io/client-go/informers" kube "k8s.io/client-go/kubernetes" @@ -60,12 +60,11 @@ type kubernetesWatcher struct { instanceCacheMap map[string]*game_room.Instance logger *zap.Logger queue workqueue.RateLimitingInterface - store cache.Store schedulerName string } type QueueItem struct { - obj interface{} + key string eventType game_room.InstanceEventType } @@ -78,11 +77,7 @@ func NewKubernetesWatcher(ctx context.Context, clientSet kube.Interface, schedul instanceCacheMap: map[string]*game_room.Instance{}, logger: zap.L(), queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - store: cache.NewStore(func(obj interface{}) (string, error) { - item := obj.(*QueueItem) - return cache.MetaNamespaceKeyFunc(item.obj) - }), - schedulerName: schedulerName, + schedulerName: schedulerName, } } @@ -187,11 +182,10 @@ func (kw *kubernetesWatcher) addFunc(obj interface{}) { if err != nil { return } - _ = kw.store.Add(&QueueItem{ - obj: obj, + kw.queue.Add(&QueueItem{ + key: key, eventType: game_room.InstanceEventTypeAdded, }) - kw.queue.Add(key) } func (kw *kubernetesWatcher) updateFunc(obj interface{}, newObj interface{}) { @@ -206,12 +200,10 @@ func (kw *kubernetesWatcher) updateFunc(obj interface{}, newObj interface{}) { return } - _ = kw.store.Add(&QueueItem{ - obj: newObj, + kw.queue.Add(&QueueItem{ + key: key, eventType: game_room.InstanceEventTypeUpdated, }) - kw.queue.Add(key) - } func (kw *kubernetesWatcher) deleteFunc(obj interface{}) { @@ -219,11 +211,10 @@ func (kw *kubernetesWatcher) deleteFunc(obj interface{}) { if err != nil { return } - _ = kw.store.Add(&QueueItem{ - obj: obj, + kw.queue.Add(&QueueItem{ + key: key, eventType: game_room.InstanceEventTypeDeleted, }) - kw.queue.Add(key) } func (k *kubernetes) WatchGameRoomInstances(ctx context.Context, scheduler *entities.Scheduler) (ports.RuntimeWatcher, error) { @@ -245,24 +236,35 @@ func (k *kubernetes) WatchGameRoomInstances(ctx context.Context, scheduler *enti // Same amount of routines reading from the channel for i := 0; i < 200; i++ { - go watcher.runWorker() + go watcher.runWorker(podsInformer) } return watcher, nil } -func (kw *kubernetesWatcher) runWorker() { +func (kw *kubernetesWatcher) runWorker(informer cache.SharedIndexInformer) { for { - key, quit := kw.queue.Get() + item, quit := kw.queue.Get() if quit { return } + queueItem := item.(*QueueItem) - item, _, _ := kw.store.GetByKey(key.(string)) - - convertedItem := item.(*QueueItem) - kw.processEvent(convertedItem.eventType, convertedItem.obj) - kw.queue.Forget(key) + obj, exists, err := informer.GetIndexer().GetByKey(queueItem.key) + if err != nil { + continue + } + if !exists && queueItem.eventType == game_room.InstanceEventTypeDeleted { + _, podName, err := cache.SplitMetaNamespaceKey(queueItem.key) + if err != nil { + continue + } + obj = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: podName}, + } + } - kw.queue.Done(key) + kw.processEvent(queueItem.eventType, obj) + kw.queue.Forget(queueItem) + kw.queue.Done(queueItem) } }