diff --git a/pkg/controllers/provisioning/batcher.go b/pkg/controllers/provisioning/batcher.go index 1f6ef7c72cb2..729c6957d9a1 100644 --- a/pkg/controllers/provisioning/batcher.go +++ b/pkg/controllers/provisioning/batcher.go @@ -21,7 +21,7 @@ import ( ) var ( - MaxBatchDuration = time.Second * 10 + MaxBatchDuration = time.Second * 10 BatchIdleDuration = time.Second * 1 // MaxItemsPerBatch limits the number of items we process at one time to avoid using too much memory MaxItemsPerBatch = 2_000 @@ -52,17 +52,20 @@ func NewBatcher(running context.Context) *Batcher { // Add an item to the batch, returning the next gate which the caller may block // on. The gate is protected by a read-write mutex, and may be modified by // Flush(), which makes a new gate. -func (b *Batcher) Add(ctx context.Context, item interface{}) <-chan struct{} { - b.RLock() - defer b.RUnlock() +// +// In rare scenarios, if a goroutine hangs after enqueueing but before acquiring +// the gate lock, the batch could be flushed, resulting in the pod waiting on +// the next gate. This will be flushed on the next batch, and may result in +// delayed retries for the individual pod if the provisioning loop fails. In +// practice, this won't be encountered because this time window is O(seconds). +func (b *Batcher) Add(item interface{}) <-chan struct{} { select { case b.queue <- item: - return b.gate.Done() - case <-b.gate.Done(): - return b.gate.Done() - case <-ctx.Done(): - return ctx.Done() + case <-b.running.Done(): } + b.RLock() + defer b.RUnlock() + return b.gate.Done() } // Flush all goroutines blocking on the current gate and create a new gate. @@ -73,8 +76,8 @@ func (b *Batcher) Flush() { b.gate, b.flush = context.WithCancel(b.running) } -// Wait returns a slice of enqueued items after idle or timeout -func (b *Batcher) Wait(ctx context.Context) (items []interface{}, window time.Duration) { +// Wait starts a batching window and returns a slice of items when closed. +func (b *Batcher) Wait() (items []interface{}, window time.Duration) { // Start the batching window after the first item is received items = append(items, <-b.queue) start := time.Now() @@ -88,11 +91,9 @@ func (b *Batcher) Wait(ctx context.Context) (items []interface{}, window time.Du return } select { - case task := <-b.queue: + case item := <-b.queue: idle.Reset(BatchIdleDuration) - items = append(items, task) - case <-ctx.Done(): - return + items = append(items, item) case <-timeout.C: return case <-idle.C: diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 98cdde01ecb4..c51dc77524da 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -51,12 +51,12 @@ func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kube packer: binpacking.NewPacker(kubeClient, cloudProvider), } go func() { - for ctx.Err() == nil { - if err := p.provision(ctx); err != nil { - logging.FromContext(ctx).Errorf("Provisioning failed, %s", err.Error()) + for running.Err() == nil { + if err := p.provision(running); err != nil { + logging.FromContext(running).Errorf("Provisioning failed, %s", err.Error()) } } - logging.FromContext(ctx).Info("Stopped provisioner") + logging.FromContext(running).Info("Stopped provisioner") }() return p } @@ -75,16 +75,16 @@ type Provisioner struct { packer *binpacking.Packer } -// Add a pod to the provisioner and block until it's processed. The caller -// is responsible for verifying that the pod was scheduled correctly. -func (p *Provisioner) Add(ctx context.Context, pod *v1.Pod) <-chan struct{} { - return p.batcher.Add(ctx, pod) +// Add a pod to the provisioner and return a channel to block on. The caller is +// responsible for verifying that the pod was scheduled correctly. +func (p *Provisioner) Add(pod *v1.Pod) <-chan struct{} { + return p.batcher.Add(pod) } func (p *Provisioner) provision(ctx context.Context) (err error) { // Batch pods logging.FromContext(ctx).Infof("Waiting for unschedulable pods") - items, window := p.batcher.Wait(ctx) + items, window := p.batcher.Wait() defer p.batcher.Flush() logging.FromContext(ctx).Infof("Batched %d pods in %s", len(items), window) // Filter pods diff --git a/pkg/controllers/selection/controller.go b/pkg/controllers/selection/controller.go index 8199ead4833f..b7b4648f19fc 100644 --- a/pkg/controllers/selection/controller.go +++ b/pkg/controllers/selection/controller.go @@ -76,7 +76,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco logging.FromContext(ctx).Debugf("Could not schedule pod, %s", err.Error()) return reconcile.Result{}, err } - return reconcile.Result{RequeueAfter: time.Second * 1}, nil + return reconcile.Result{RequeueAfter: time.Second * 5}, nil } func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (errs error) { @@ -99,7 +99,10 @@ func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (errs e if provisioner == nil { return fmt.Errorf("matched 0/%d provisioners, %w", len(multierr.Errors(errs)), errs) } - <-provisioner.Add(ctx, pod) + select { + case <-provisioner.Add(pod): + case <-ctx.Done(): + } return nil }