From 126a273882432fc6c196c053cf8e0ee6cff1a136 Mon Sep 17 00:00:00 2001 From: Brandon Date: Wed, 7 Jul 2021 18:14:49 -0500 Subject: [PATCH] update batches to windows in the batcher --- pkg/controllers/allocation/batch.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/controllers/allocation/batch.go b/pkg/controllers/allocation/batch.go index 12c40823b987..3c5caba02aad 100644 --- a/pkg/controllers/allocation/batch.go +++ b/pkg/controllers/allocation/batch.go @@ -30,8 +30,8 @@ type Batcher struct { // It should be a smaller duration than MaxBatchPeriod IdlePeriod time.Duration - // batches keeps a mapping of a key (like a provisioner name and namespace) to a specific Batch - batches map[string]*Window + // windows keeps a mapping of a key (like a provisioner name and namespace) to a specific object's batch window + windows map[string]*Window // updates is a stream of obj keys that can start or provide a progress signal to an object's batch window updates chan string // isMonitorRunning indicates if the monitor go routine has been started @@ -50,7 +50,7 @@ func NewBatcher(maxBatchPeriod time.Duration, idlePeriod time.Duration) *Batcher return &Batcher{ MaxBatchPeriod: maxBatchPeriod, IdlePeriod: idlePeriod, - batches: map[string]*Window{}, + windows: map[string]*Window{}, updates: make(chan string, 1000), } } @@ -79,7 +79,7 @@ func (b *Batcher) Add(obj metav1.Object) { // If the batch is empty, it will block until something is added or the window times out // Wait should not be called concurrently for the same object but can be called concurrently for different objects func (b *Batcher) Wait(obj metav1.Object) { - batch, ok := b.batches[b.keyFrom(obj)] + batch, ok := b.windows[b.keyFrom(obj)] if !ok { return } @@ -90,7 +90,7 @@ func (b *Batcher) Wait(obj metav1.Object) { // Remove should only be called if there are no Add calls or Wait calls happening concurrently // After a Remove call for an object, a subsequent Add for the same object will recreate the window func (b *Batcher) Remove(obj metav1.Object) { - delete(b.batches, b.keyFrom(obj)) + delete(b.windows, b.keyFrom(obj)) } // monitor is a synchronous loop that controls the window start, update, and end @@ -100,9 +100,9 @@ func (b *Batcher) monitor(ctx context.Context) { ticker := time.NewTicker(time.Second * 1) for { select { - // Wake and check all batches for timed out windows + // Wake and check for any timed out batch windows case <-ticker.C: - for _, batch := range b.batches { + for _, batch := range b.windows { b.checkForWindowEndAndNotify(batch) } // Start a new window or update progress on a window @@ -133,13 +133,13 @@ func (b *Batcher) checkForWindowEndAndNotify(window *Window) { // startOrUpdateWindow starts a new window for the object key if one does not already exist // if a window already exists for the object key, then the lastUpdate time is set func (b *Batcher) startOrUpdateWindow(key string) { - if window, ok := b.batches[key]; ok { + if window, ok := b.windows[key]; ok { window.lastUpdated = time.Now() if window.started.IsZero() { window.started = time.Now() } } else { - b.batches[key] = &Window{lastUpdated: time.Now(), started: time.Now(), closed: make(chan bool, 1)} + b.windows[key] = &Window{lastUpdated: time.Now(), started: time.Now(), closed: make(chan bool, 1)} } }