Skip to content

Commit

Permalink
Use informer indexer for retrieve workqueue objects instead of custom…
Browse files Browse the repository at this point in the history
… storage. (#473)

Use the pod's informer indexer to retrieve workqueue objects instead of custom storage. Clean resource handlers to only be responsible for enqueuing object keys and not performing any other action.
  • Loading branch information
guilhermocc authored Jun 17, 2022
1 parent 12cc04f commit 4e96e3f
Showing 1 changed file with 32 additions and 30 deletions.
62 changes: 32 additions & 30 deletions internal/adapters/runtime/kubernetes/game_room_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ import (
"sync"
"time"

"github.com/topfreegames/maestro/internal/core/entities"
"github.com/topfreegames/maestro/internal/core/ports"
"go.uber.org/zap"

"k8s.io/client-go/util/workqueue"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/topfreegames/maestro/internal/core/logs"

"go.uber.org/zap"

"github.com/topfreegames/maestro/internal/core/entities"
"github.com/topfreegames/maestro/internal/core/entities/game_room"
"github.com/topfreegames/maestro/internal/core/ports"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
kube "k8s.io/client-go/kubernetes"
Expand All @@ -60,12 +60,11 @@ type kubernetesWatcher struct {
instanceCacheMap map[string]*game_room.Instance
logger *zap.Logger
queue workqueue.RateLimitingInterface
store cache.Store
schedulerName string
}

type QueueItem struct {
obj interface{}
key string
eventType game_room.InstanceEventType
}

Expand All @@ -78,11 +77,7 @@ func NewKubernetesWatcher(ctx context.Context, clientSet kube.Interface, schedul
instanceCacheMap: map[string]*game_room.Instance{},
logger: zap.L(),
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
store: cache.NewStore(func(obj interface{}) (string, error) {
item := obj.(*QueueItem)
return cache.MetaNamespaceKeyFunc(item.obj)
}),
schedulerName: schedulerName,
schedulerName: schedulerName,
}
}

Expand Down Expand Up @@ -187,11 +182,10 @@ func (kw *kubernetesWatcher) addFunc(obj interface{}) {
if err != nil {
return
}
_ = kw.store.Add(&QueueItem{
obj: obj,
kw.queue.Add(&QueueItem{
key: key,
eventType: game_room.InstanceEventTypeAdded,
})
kw.queue.Add(key)
}

func (kw *kubernetesWatcher) updateFunc(obj interface{}, newObj interface{}) {
Expand All @@ -206,24 +200,21 @@ func (kw *kubernetesWatcher) updateFunc(obj interface{}, newObj interface{}) {
return
}

_ = kw.store.Add(&QueueItem{
obj: newObj,
kw.queue.Add(&QueueItem{
key: key,
eventType: game_room.InstanceEventTypeUpdated,
})
kw.queue.Add(key)

}

func (kw *kubernetesWatcher) deleteFunc(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return
}
_ = kw.store.Add(&QueueItem{
obj: obj,
kw.queue.Add(&QueueItem{
key: key,
eventType: game_room.InstanceEventTypeDeleted,
})
kw.queue.Add(key)
}

func (k *kubernetes) WatchGameRoomInstances(ctx context.Context, scheduler *entities.Scheduler) (ports.RuntimeWatcher, error) {
Expand All @@ -245,24 +236,35 @@ func (k *kubernetes) WatchGameRoomInstances(ctx context.Context, scheduler *enti

// Same amount of routines reading from the channel
for i := 0; i < 200; i++ {
go watcher.runWorker()
go watcher.runWorker(podsInformer)
}
return watcher, nil
}

func (kw *kubernetesWatcher) runWorker() {
func (kw *kubernetesWatcher) runWorker(informer cache.SharedIndexInformer) {
for {
key, quit := kw.queue.Get()
item, quit := kw.queue.Get()
if quit {
return
}
queueItem := item.(*QueueItem)

item, _, _ := kw.store.GetByKey(key.(string))

convertedItem := item.(*QueueItem)
kw.processEvent(convertedItem.eventType, convertedItem.obj)
kw.queue.Forget(key)
obj, exists, err := informer.GetIndexer().GetByKey(queueItem.key)
if err != nil {
continue
}
if !exists && queueItem.eventType == game_room.InstanceEventTypeDeleted {
_, podName, err := cache.SplitMetaNamespaceKey(queueItem.key)
if err != nil {
continue
}
obj = &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: podName},
}
}

kw.queue.Done(key)
kw.processEvent(queueItem.eventType, obj)
kw.queue.Forget(queueItem)
kw.queue.Done(queueItem)
}
}

0 comments on commit 4e96e3f

Please sign in to comment.