Skip to content

Commit

Permalink
pausing
Browse files Browse the repository at this point in the history
  • Loading branch information
ellistarn committed Jan 4, 2022
1 parent 0b86c9d commit 5c9cdfe
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 26 deletions.
31 changes: 16 additions & 15 deletions pkg/controllers/provisioning/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

var (
MaxBatchDuration = time.Second * 10
MaxBatchDuration = time.Second * 10
BatchIdleDuration = time.Second * 1
// MaxItemsPerBatch limits the number of items we process at one time to avoid using too much memory
MaxItemsPerBatch = 2_000
Expand Down Expand Up @@ -52,17 +52,20 @@ func NewBatcher(running context.Context) *Batcher {
// Add an item to the batch, returning the next gate which the caller may block
// on. The gate is protected by a read-write mutex, and may be modified by
// Flush(), which makes a new gate.
func (b *Batcher) Add(ctx context.Context, item interface{}) <-chan struct{} {
b.RLock()
defer b.RUnlock()
//
// In rare scenarios, if a goroutine hangs after enqueueing but before acquiring
// the gate lock, the batch could be flushed, resulting in the pod waiting on
// the next gate. This will be flushed on the next batch, and may result in
// delayed retries for the individual pod if the provisioning loop fails. In
// practice, this won't be encountered because this time window is O(seconds).
func (b *Batcher) Add(item interface{}) <-chan struct{} {
select {
case b.queue <- item:
return b.gate.Done()
case <-b.gate.Done():
return b.gate.Done()
case <-ctx.Done():
return ctx.Done()
case <-b.running.Done():
}
b.RLock()
defer b.RUnlock()
return b.gate.Done()
}

// Flush all goroutines blocking on the current gate and create a new gate.
Expand All @@ -73,8 +76,8 @@ func (b *Batcher) Flush() {
b.gate, b.flush = context.WithCancel(b.running)
}

// Wait returns a slice of enqueued items after idle or timeout
func (b *Batcher) Wait(ctx context.Context) (items []interface{}, window time.Duration) {
// Wait starts a batching window and returns a slice of items when closed.
func (b *Batcher) Wait() (items []interface{}, window time.Duration) {
// Start the batching window after the first item is received
items = append(items, <-b.queue)
start := time.Now()
Expand All @@ -88,11 +91,9 @@ func (b *Batcher) Wait(ctx context.Context) (items []interface{}, window time.Du
return
}
select {
case task := <-b.queue:
case item := <-b.queue:
idle.Reset(BatchIdleDuration)
items = append(items, task)
case <-ctx.Done():
return
items = append(items, item)
case <-timeout.C:
return
case <-idle.C:
Expand Down
18 changes: 9 additions & 9 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kube
packer: binpacking.NewPacker(kubeClient, cloudProvider),
}
go func() {
for ctx.Err() == nil {
if err := p.provision(ctx); err != nil {
logging.FromContext(ctx).Errorf("Provisioning failed, %s", err.Error())
for running.Err() == nil {
if err := p.provision(running); err != nil {
logging.FromContext(running).Errorf("Provisioning failed, %s", err.Error())
}
}
logging.FromContext(ctx).Info("Stopped provisioner")
logging.FromContext(running).Info("Stopped provisioner")
}()
return p
}
Expand All @@ -75,16 +75,16 @@ type Provisioner struct {
packer *binpacking.Packer
}

// Add a pod to the provisioner and block until it's processed. The caller
// is responsible for verifying that the pod was scheduled correctly.
func (p *Provisioner) Add(ctx context.Context, pod *v1.Pod) <-chan struct{} {
return p.batcher.Add(ctx, pod)
// Add a pod to the provisioner and return a channel to block on. The caller is
// responsible for verifying that the pod was scheduled correctly.
func (p *Provisioner) Add(pod *v1.Pod) <-chan struct{} {
return p.batcher.Add(pod)
}

func (p *Provisioner) provision(ctx context.Context) (err error) {
// Batch pods
logging.FromContext(ctx).Infof("Waiting for unschedulable pods")
items, window := p.batcher.Wait(ctx)
items, window := p.batcher.Wait()
defer p.batcher.Flush()
logging.FromContext(ctx).Infof("Batched %d pods in %s", len(items), window)
// Filter pods
Expand Down
7 changes: 5 additions & 2 deletions pkg/controllers/selection/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
logging.FromContext(ctx).Debugf("Could not schedule pod, %s", err.Error())
return reconcile.Result{}, err
}
return reconcile.Result{RequeueAfter: time.Second * 1}, nil
return reconcile.Result{RequeueAfter: time.Second * 5}, nil
}

func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (errs error) {
Expand All @@ -99,7 +99,10 @@ func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (errs e
if provisioner == nil {
return fmt.Errorf("matched 0/%d provisioners, %w", len(multierr.Errors(errs)), errs)
}
<-provisioner.Add(ctx, pod)
select {
case <-provisioner.Add(pod):
case <-ctx.Done():
}
return nil
}

Expand Down

0 comments on commit 5c9cdfe

Please sign in to comment.