From 0b86c9d4fc66de8f3409b9cdd5b99fce1d5fb8d0 Mon Sep 17 00:00:00 2001 From: Ellis Tarn Date: Sun, 2 Jan 2022 12:57:54 -0800 Subject: [PATCH 1/2] Rewrote batching logic to no longer rely on a results channel --- pkg/controllers/provisioning/batcher.go | 102 ++++++++++++++++++ pkg/controllers/provisioning/provisioner.go | 110 ++++++-------------- pkg/controllers/selection/controller.go | 3 +- pkg/test/expectations/expectations.go | 2 +- 4 files changed, 134 insertions(+), 83 deletions(-) create mode 100644 pkg/controllers/provisioning/batcher.go diff --git a/pkg/controllers/provisioning/batcher.go b/pkg/controllers/provisioning/batcher.go new file mode 100644 index 000000000000..1f6ef7c72cb2 --- /dev/null +++ b/pkg/controllers/provisioning/batcher.go @@ -0,0 +1,102 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provisioning + +import ( + "context" + "sync" + "time" +) + +var ( + MaxBatchDuration = time.Second * 10 + BatchIdleDuration = time.Second * 1 + // MaxItemsPerBatch limits the number of items we process at one time to avoid using too much memory + MaxItemsPerBatch = 2_000 +) + +// Batcher separates a stream of Add(item) calls into windowed slices. The +// window is dynamic and will be extended if additional items are added up to a +// maximum batch duration or maximum items per batch. +type Batcher struct { + sync.RWMutex + running context.Context + queue chan interface{} + gate context.Context + flush context.CancelFunc +} + +// NewBatcher is a constructor +func NewBatcher(running context.Context) *Batcher { + gate, flush := context.WithCancel(running) + return &Batcher{ + running: running, + queue: make(chan interface{}), + gate: gate, + flush: flush, + } +} + +// Add an item to the batch, returning the next gate which the caller may block +// on. The gate is protected by a read-write mutex, and may be modified by +// Flush(), which makes a new gate. +func (b *Batcher) Add(ctx context.Context, item interface{}) <-chan struct{} { + b.RLock() + defer b.RUnlock() + select { + case b.queue <- item: + return b.gate.Done() + case <-b.gate.Done(): + return b.gate.Done() + case <-ctx.Done(): + return ctx.Done() + } +} + +// Flush all goroutines blocking on the current gate and create a new gate. +func (b *Batcher) Flush() { + b.Lock() + defer b.Unlock() + b.flush() + b.gate, b.flush = context.WithCancel(b.running) +} + +// Wait returns a slice of enqueued items after idle or timeout +func (b *Batcher) Wait(ctx context.Context) (items []interface{}, window time.Duration) { + // Start the batching window after the first item is received + items = append(items, <-b.queue) + start := time.Now() + defer func() { + window = time.Since(start) + }() + timeout := time.NewTimer(MaxBatchDuration) + idle := time.NewTimer(BatchIdleDuration) + for { + if len(items) >= MaxItemsPerBatch { + return + } + select { + case task := <-b.queue: + idle.Reset(BatchIdleDuration) + items = append(items, task) + case <-ctx.Done(): + return + case <-timeout.C: + return + case <-idle.C: + return + } + } +} diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 488ccb7c880f..98cdde01ecb4 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "sync/atomic" - "time" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" @@ -27,11 +26,11 @@ import ( "github.com/aws/karpenter/pkg/metrics" "github.com/aws/karpenter/pkg/utils/functional" "github.com/aws/karpenter/pkg/utils/injection" + "github.com/aws/karpenter/pkg/utils/pod" "github.com/prometheus/client_golang/prometheus" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/util/workqueue" "knative.dev/pkg/logging" @@ -39,20 +38,11 @@ import ( crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" ) -var ( - MaxBatchDuration = time.Second * 10 - MinBatchDuration = time.Second * 1 - // MaxPodsPerBatch limits the number of pods we process at one time to avoid using too much memory - MaxPodsPerBatch = 2_000 -) - func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kubeClient client.Client, coreV1Client corev1.CoreV1Interface, cloudProvider cloudprovider.CloudProvider) *Provisioner { running, stop := context.WithCancel(ctx) p := &Provisioner{ Provisioner: provisioner, - pods: make(chan *v1.Pod), - wait: make(chan struct{}), - running: running, + batcher: NewBatcher(running), Stop: stop, cloudProvider: cloudProvider, kubeClient: kubeClient, @@ -61,13 +51,12 @@ func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kube packer: binpacking.NewPacker(kubeClient, cloudProvider), } go func() { - for p.running.Err() == nil { + for ctx.Err() == nil { if err := p.provision(ctx); err != nil { logging.FromContext(ctx).Errorf("Provisioning failed, %s", err.Error()) } } - close(p.wait) - logging.FromContext(ctx).Info("Stopping provisioner") + logging.FromContext(ctx).Info("Stopped provisioner") }() return p } @@ -76,9 +65,7 @@ func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kube type Provisioner struct { // State *v1alpha5.Provisioner - pods chan *v1.Pod - wait chan struct{} - running context.Context + batcher *Batcher Stop context.CancelFunc // Dependencies cloudProvider cloudprovider.CloudProvider @@ -89,28 +76,27 @@ type Provisioner struct { } // Add a pod to the provisioner and block until it's processed. The caller -// is responsible for verifying that the pod was scheduled correctly. In the -// future, this may be expanded to include concepts such as retriable errors. -func (p *Provisioner) Add(ctx context.Context, pod *v1.Pod) { - select { - case p.pods <- pod: // Block until pod is enqueued - <-p.wait - case <-p.running.Done(): // Leave if closed - } +// is responsible for verifying that the pod was scheduled correctly. +func (p *Provisioner) Add(ctx context.Context, pod *v1.Pod) <-chan struct{} { + return p.batcher.Add(ctx, pod) } func (p *Provisioner) provision(ctx context.Context) (err error) { - // Wait for a batch of pods, release when done - pods := p.batch(ctx) - defer func() { - for i := 0; i < len(pods); i++ { - p.wait <- struct{}{} + // Batch pods + logging.FromContext(ctx).Infof("Waiting for unschedulable pods") + items, window := p.batcher.Wait(ctx) + defer p.batcher.Flush() + logging.FromContext(ctx).Infof("Batched %d pods in %s", len(items), window) + // Filter pods + pods := []*v1.Pod{} + for _, item := range items { + provisionable, err := p.isProvisionable(ctx, item.(*v1.Pod)) + if err != nil { + return err + } + if provisionable { + pods = append(pods, item.(*v1.Pod)) } - }() - // Ensure pods are still provisionable - pods, err = p.filter(ctx, pods) - if err != nil { - return fmt.Errorf("filtering provisionable pods, %w", err) } // Separate pods by scheduling constraints schedules, err := p.scheduler.Solve(ctx, p.Provisioner, pods) @@ -133,55 +119,19 @@ func (p *Provisioner) provision(ctx context.Context) (err error) { return nil } -// Batch returns a slice of enqueued pods after idle or timeout -func (p *Provisioner) batch(ctx context.Context) (pods []*v1.Pod) { - logging.FromContext(ctx).Infof("Waiting for unschedulable pods") - // Start the batching window after the first pod is received - pods = append(pods, <-p.pods) - timeout := time.NewTimer(MaxBatchDuration) - idle := time.NewTimer(MinBatchDuration) - start := time.Now() - defer func() { - logging.FromContext(ctx).Infof("Batched %d pods in %s", len(pods), time.Since(start)) - }() - for { - if len(pods) >= MaxPodsPerBatch { - return pods - } - select { - case pod := <-p.pods: - idle.Reset(MinBatchDuration) - pods = append(pods, pod) - case <-ctx.Done(): - return pods - case <-timeout.C: - return pods - case <-idle.C: - return pods - } - } -} - -// filter removes pods that have been assigned a node. +// isProvisionable ensure that the pod can still be provisioned. // This check is needed to prevent duplicate binds when a pod is scheduled to a node // between the time it was ingested into the scheduler and the time it is included // in a provisioner batch. -func (p *Provisioner) filter(ctx context.Context, pods []*v1.Pod) ([]*v1.Pod, error) { - provisionable := []*v1.Pod{} - for _, pod := range pods { - // Do not mutate the pod in case the scheduler relaxed constraints - stored := &v1.Pod{} - if err := p.kubeClient.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, stored); err != nil { - if errors.IsNotFound(err) { - continue - } - return nil, err - } - if stored.Spec.NodeName == "" { - provisionable = append(provisionable, pod) +func (p *Provisioner) isProvisionable(ctx context.Context, candidate *v1.Pod) (bool, error) { + stored := &v1.Pod{} + if err := p.kubeClient.Get(ctx, client.ObjectKeyFromObject(candidate), stored); err != nil { + if errors.IsNotFound(err) { + return false, nil } + return false, err } - return provisionable, nil + return !pod.IsScheduled(candidate), nil } func (p *Provisioner) launch(ctx context.Context, constraints *v1alpha5.Constraints, packing *binpacking.Packing) error { diff --git a/pkg/controllers/selection/controller.go b/pkg/controllers/selection/controller.go index 2c670ec183af..8199ead4833f 100644 --- a/pkg/controllers/selection/controller.go +++ b/pkg/controllers/selection/controller.go @@ -99,7 +99,7 @@ func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (errs e if provisioner == nil { return fmt.Errorf("matched 0/%d provisioners, %w", len(multierr.Errors(errs)), errs) } - provisioner.Add(ctx, pod) + <-provisioner.Add(ctx, pod) return nil } @@ -109,7 +109,6 @@ func isProvisionable(p *v1.Pod) bool { pod.FailedToSchedule(p) && !pod.IsOwnedByDaemonSet(p) && !pod.IsOwnedByNode(p) - } func validate(p *v1.Pod) error { diff --git a/pkg/test/expectations/expectations.go b/pkg/test/expectations/expectations.go index 8c71df3540d6..4e2bbb8b1671 100644 --- a/pkg/test/expectations/expectations.go +++ b/pkg/test/expectations/expectations.go @@ -158,7 +158,7 @@ func ExpectProvisioningCleanedUp(ctx context.Context, c client.Client, controlle } func ExpectProvisioned(ctx context.Context, c client.Client, selectionController *selection.Controller, provisioningController *provisioning.Controller, provisioner *v1alpha5.Provisioner, pods ...*v1.Pod) (result []*v1.Pod) { - provisioning.MaxPodsPerBatch = len(pods) + provisioning.MaxItemsPerBatch = len(pods) // Persist objects ExpectApplied(ctx, c, provisioner) ExpectStatusUpdated(ctx, c, provisioner) From 5c9cdfeab428d821b86bc1a17c9a9af6c5e69cb4 Mon Sep 17 00:00:00 2001 From: Ellis Tarn Date: Tue, 4 Jan 2022 10:43:41 -0800 Subject: [PATCH 2/2] pausing --- pkg/controllers/provisioning/batcher.go | 31 +++++++++++---------- pkg/controllers/provisioning/provisioner.go | 18 ++++++------ pkg/controllers/selection/controller.go | 7 +++-- 3 files changed, 30 insertions(+), 26 deletions(-) diff --git a/pkg/controllers/provisioning/batcher.go b/pkg/controllers/provisioning/batcher.go index 1f6ef7c72cb2..729c6957d9a1 100644 --- a/pkg/controllers/provisioning/batcher.go +++ b/pkg/controllers/provisioning/batcher.go @@ -21,7 +21,7 @@ import ( ) var ( - MaxBatchDuration = time.Second * 10 + MaxBatchDuration = time.Second * 10 BatchIdleDuration = time.Second * 1 // MaxItemsPerBatch limits the number of items we process at one time to avoid using too much memory MaxItemsPerBatch = 2_000 @@ -52,17 +52,20 @@ func NewBatcher(running context.Context) *Batcher { // Add an item to the batch, returning the next gate which the caller may block // on. The gate is protected by a read-write mutex, and may be modified by // Flush(), which makes a new gate. -func (b *Batcher) Add(ctx context.Context, item interface{}) <-chan struct{} { - b.RLock() - defer b.RUnlock() +// +// In rare scenarios, if a goroutine hangs after enqueueing but before acquiring +// the gate lock, the batch could be flushed, resulting in the pod waiting on +// the next gate. This will be flushed on the next batch, and may result in +// delayed retries for the individual pod if the provisioning loop fails. In +// practice, this won't be encountered because this time window is O(seconds). +func (b *Batcher) Add(item interface{}) <-chan struct{} { select { case b.queue <- item: - return b.gate.Done() - case <-b.gate.Done(): - return b.gate.Done() - case <-ctx.Done(): - return ctx.Done() + case <-b.running.Done(): } + b.RLock() + defer b.RUnlock() + return b.gate.Done() } // Flush all goroutines blocking on the current gate and create a new gate. @@ -73,8 +76,8 @@ func (b *Batcher) Flush() { b.gate, b.flush = context.WithCancel(b.running) } -// Wait returns a slice of enqueued items after idle or timeout -func (b *Batcher) Wait(ctx context.Context) (items []interface{}, window time.Duration) { +// Wait starts a batching window and returns a slice of items when closed. +func (b *Batcher) Wait() (items []interface{}, window time.Duration) { // Start the batching window after the first item is received items = append(items, <-b.queue) start := time.Now() @@ -88,11 +91,9 @@ func (b *Batcher) Wait(ctx context.Context) (items []interface{}, window time.Du return } select { - case task := <-b.queue: + case item := <-b.queue: idle.Reset(BatchIdleDuration) - items = append(items, task) - case <-ctx.Done(): - return + items = append(items, item) case <-timeout.C: return case <-idle.C: diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 98cdde01ecb4..c51dc77524da 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -51,12 +51,12 @@ func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kube packer: binpacking.NewPacker(kubeClient, cloudProvider), } go func() { - for ctx.Err() == nil { - if err := p.provision(ctx); err != nil { - logging.FromContext(ctx).Errorf("Provisioning failed, %s", err.Error()) + for running.Err() == nil { + if err := p.provision(running); err != nil { + logging.FromContext(running).Errorf("Provisioning failed, %s", err.Error()) } } - logging.FromContext(ctx).Info("Stopped provisioner") + logging.FromContext(running).Info("Stopped provisioner") }() return p } @@ -75,16 +75,16 @@ type Provisioner struct { packer *binpacking.Packer } -// Add a pod to the provisioner and block until it's processed. The caller -// is responsible for verifying that the pod was scheduled correctly. -func (p *Provisioner) Add(ctx context.Context, pod *v1.Pod) <-chan struct{} { - return p.batcher.Add(ctx, pod) +// Add a pod to the provisioner and return a channel to block on. The caller is +// responsible for verifying that the pod was scheduled correctly. +func (p *Provisioner) Add(pod *v1.Pod) <-chan struct{} { + return p.batcher.Add(pod) } func (p *Provisioner) provision(ctx context.Context) (err error) { // Batch pods logging.FromContext(ctx).Infof("Waiting for unschedulable pods") - items, window := p.batcher.Wait(ctx) + items, window := p.batcher.Wait() defer p.batcher.Flush() logging.FromContext(ctx).Infof("Batched %d pods in %s", len(items), window) // Filter pods diff --git a/pkg/controllers/selection/controller.go b/pkg/controllers/selection/controller.go index 8199ead4833f..b7b4648f19fc 100644 --- a/pkg/controllers/selection/controller.go +++ b/pkg/controllers/selection/controller.go @@ -76,7 +76,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco logging.FromContext(ctx).Debugf("Could not schedule pod, %s", err.Error()) return reconcile.Result{}, err } - return reconcile.Result{RequeueAfter: time.Second * 1}, nil + return reconcile.Result{RequeueAfter: time.Second * 5}, nil } func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (errs error) { @@ -99,7 +99,10 @@ func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (errs e if provisioner == nil { return fmt.Errorf("matched 0/%d provisioners, %w", len(multierr.Errors(errs)), errs) } - <-provisioner.Add(ctx, pod) + select { + case <-provisioner.Add(pod): + case <-ctx.Done(): + } return nil }