Skip to content

Commit

Permalink
update batches to windows in the batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
bwagner5 committed Jul 7, 2021
1 parent 5577ed3 commit 126a273
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions pkg/controllers/allocation/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type Batcher struct {
// It should be a smaller duration than MaxBatchPeriod
IdlePeriod time.Duration

// batches keeps a mapping of a key (like a provisioner name and namespace) to a specific Batch
batches map[string]*Window
// windows keeps a mapping of a key (like a provisioner name and namespace) to a specific object's batch window
windows map[string]*Window
// updates is a stream of obj keys that can start or provide a progress signal to an object's batch window
updates chan string
// isMonitorRunning indicates if the monitor go routine has been started
Expand All @@ -50,7 +50,7 @@ func NewBatcher(maxBatchPeriod time.Duration, idlePeriod time.Duration) *Batcher
return &Batcher{
MaxBatchPeriod: maxBatchPeriod,
IdlePeriod: idlePeriod,
batches: map[string]*Window{},
windows: map[string]*Window{},
updates: make(chan string, 1000),
}
}
Expand Down Expand Up @@ -79,7 +79,7 @@ func (b *Batcher) Add(obj metav1.Object) {
// If the batch is empty, it will block until something is added or the window times out
// Wait should not be called concurrently for the same object but can be called concurrently for different objects
func (b *Batcher) Wait(obj metav1.Object) {
batch, ok := b.batches[b.keyFrom(obj)]
batch, ok := b.windows[b.keyFrom(obj)]
if !ok {
return
}
Expand All @@ -90,7 +90,7 @@ func (b *Batcher) Wait(obj metav1.Object) {
// Remove should only be called if there are no Add calls or Wait calls happening concurrently
// After a Remove call for an object, a subsequent Add for the same object will recreate the window
func (b *Batcher) Remove(obj metav1.Object) {
delete(b.batches, b.keyFrom(obj))
delete(b.windows, b.keyFrom(obj))
}

// monitor is a synchronous loop that controls the window start, update, and end
Expand All @@ -100,9 +100,9 @@ func (b *Batcher) monitor(ctx context.Context) {
ticker := time.NewTicker(time.Second * 1)
for {
select {
// Wake and check all batches for timed out windows
// Wake and check for any timed out batch windows
case <-ticker.C:
for _, batch := range b.batches {
for _, batch := range b.windows {
b.checkForWindowEndAndNotify(batch)
}
// Start a new window or update progress on a window
Expand Down Expand Up @@ -133,13 +133,13 @@ func (b *Batcher) checkForWindowEndAndNotify(window *Window) {
// startOrUpdateWindow starts a new window for the object key if one does not already exist
// if a window already exists for the object key, then the lastUpdate time is set
func (b *Batcher) startOrUpdateWindow(key string) {
if window, ok := b.batches[key]; ok {
if window, ok := b.windows[key]; ok {
window.lastUpdated = time.Now()
if window.started.IsZero() {
window.started = time.Now()
}
} else {
b.batches[key] = &Window{lastUpdated: time.Now(), started: time.Now(), closed: make(chan bool, 1)}
b.windows[key] = &Window{lastUpdated: time.Now(), started: time.Now(), closed: make(chan bool, 1)}
}
}

Expand Down

0 comments on commit 126a273

Please sign in to comment.