diff --git a/pkg/controllers/provisioning/batch.go b/pkg/controllers/provisioning/batch.go new file mode 100644 index 000000000000..6b80c6056c6f --- /dev/null +++ b/pkg/controllers/provisioning/batch.go @@ -0,0 +1,78 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provisioning + +import ( + "context" + "time" +) + +var ( + MaxBatchDuration = time.Second * 10 + MinBatchDuration = time.Second * 1 + // MaxItemsPerBatch limits the number of items we process at one time to avoid using too much memory + MaxItemsPerBatch = 2_000 +) + +type Batch struct { + queue chan interface{} + Done <-chan struct{} + Complete context.CancelFunc +} + +func NewBatch(ctx context.Context) *Batch { + ctx, cancel := context.WithCancel(ctx) + return &Batch{ + queue: make(chan interface{}, MaxItemsPerBatch), + Done: ctx.Done(), + Complete: cancel, + } +} + +// Add an item to the batch if still open +func (b *Batch) Add(item interface{}) { + select { + case b.queue <- item: + case <-b.Done: + } +} + +// Wait returns a slice of enqueued items after idle or timeout +func (b *Batch) Wait(ctx context.Context) (items []interface{}, window time.Duration) { + // Start the batching window after the first item is received + items = append(items, <-b.queue) + start := time.Now() + defer func() { + window = time.Since(start) + }() + timeout := time.NewTimer(MaxBatchDuration) + idle := time.NewTimer(MinBatchDuration) + for { + if len(items) >= MaxItemsPerBatch { + return + } + select { + case task := <-b.queue: + idle.Reset(MinBatchDuration) + items = append(items, task) + case <-ctx.Done(): + return + case <-timeout.C: + return + case <-idle.C: + return + } + } +} diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 488ccb7c880f..1bf211d92ec9 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -17,8 +17,8 @@ package provisioning import ( "context" "fmt" + "sync" "sync/atomic" - "time" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" @@ -27,11 +27,11 @@ import ( "github.com/aws/karpenter/pkg/metrics" "github.com/aws/karpenter/pkg/utils/functional" "github.com/aws/karpenter/pkg/utils/injection" + "github.com/aws/karpenter/pkg/utils/pod" "github.com/prometheus/client_golang/prometheus" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/util/workqueue" "knative.dev/pkg/logging" @@ -39,20 +39,11 @@ import ( crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" ) -var ( - MaxBatchDuration = time.Second * 10 - MinBatchDuration = time.Second * 1 - // MaxPodsPerBatch limits the number of pods we process at one time to avoid using too much memory - MaxPodsPerBatch = 2_000 -) - func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kubeClient client.Client, coreV1Client corev1.CoreV1Interface, cloudProvider cloudprovider.CloudProvider) *Provisioner { - running, stop := context.WithCancel(ctx) + ctx, stop := context.WithCancel(ctx) p := &Provisioner{ Provisioner: provisioner, - pods: make(chan *v1.Pod), - wait: make(chan struct{}), - running: running, + batch: NewBatch(ctx), Stop: stop, cloudProvider: cloudProvider, kubeClient: kubeClient, @@ -60,26 +51,17 @@ func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kube scheduler: scheduling.NewScheduler(kubeClient), packer: binpacking.NewPacker(kubeClient, cloudProvider), } - go func() { - for p.running.Err() == nil { - if err := p.provision(ctx); err != nil { - logging.FromContext(ctx).Errorf("Provisioning failed, %s", err.Error()) - } - } - close(p.wait) - logging.FromContext(ctx).Info("Stopping provisioner") - }() + go p.run() return p } // Provisioner waits for enqueued pods, batches them, creates capacity and binds the pods to the capacity. type Provisioner struct { // State + sync.RWMutex *v1alpha5.Provisioner - pods chan *v1.Pod - wait chan struct{} - running context.Context - Stop context.CancelFunc + batch *Batch + Stop context.CancelFunc // Dependencies cloudProvider cloudprovider.CloudProvider kubeClient client.Client @@ -89,28 +71,44 @@ type Provisioner struct { } // Add a pod to the provisioner and block until it's processed. The caller -// is responsible for verifying that the pod was scheduled correctly. In the -// future, this may be expanded to include concepts such as retriable errors. -func (p *Provisioner) Add(ctx context.Context, pod *v1.Pod) { - select { - case p.pods <- pod: // Block until pod is enqueued - <-p.wait - case <-p.running.Done(): // Leave if closed +// is responsible for verifying that the pod was scheduled correctly. +func (p *Provisioner) Add(ctx context.Context, pod *v1.Pod) <-chan struct{} { + p.RLock() + defer p.RUnlock() + p.batch.Add(pod) + return p.batch.Done +} + +func (p *Provisioner) run(ctx context.Context) { + if ctx.Err() != nil { + logging.FromContext(ctx).Info("Stopping provisioner") + return + } + if err := p.provision(ctx); err != nil { + logging.FromContext(ctx).Errorf("Provisioning failed, %s", err.Error()) } + p.run(ctx) } func (p *Provisioner) provision(ctx context.Context) (err error) { - // Wait for a batch of pods, release when done - pods := p.batch(ctx) - defer func() { - for i := 0; i < len(pods); i++ { - p.wait <- struct{}{} + // Batch pods + logging.FromContext(ctx).Infof("Waiting for unschedulable pods") + items, window := p.batch.Wait(ctx) + defer p.batch.Complete() + p.Lock() + p.batch = NewBatch(ctx) + p.Unlock() + logging.FromContext(ctx).Infof("Batched %d pods in %s", len(items), window) + // Filter pods + pods := []*v1.Pod{} + for _, item := range items { + provisionable, err := p.isProvisionable(ctx, item.(*v1.Pod)) + if err != nil { + return err + } + if provisionable { + pods = append(pods, item.(*v1.Pod)) } - }() - // Ensure pods are still provisionable - pods, err = p.filter(ctx, pods) - if err != nil { - return fmt.Errorf("filtering provisionable pods, %w", err) } // Separate pods by scheduling constraints schedules, err := p.scheduler.Solve(ctx, p.Provisioner, pods) @@ -133,55 +131,19 @@ func (p *Provisioner) provision(ctx context.Context) (err error) { return nil } -// Batch returns a slice of enqueued pods after idle or timeout -func (p *Provisioner) batch(ctx context.Context) (pods []*v1.Pod) { - logging.FromContext(ctx).Infof("Waiting for unschedulable pods") - // Start the batching window after the first pod is received - pods = append(pods, <-p.pods) - timeout := time.NewTimer(MaxBatchDuration) - idle := time.NewTimer(MinBatchDuration) - start := time.Now() - defer func() { - logging.FromContext(ctx).Infof("Batched %d pods in %s", len(pods), time.Since(start)) - }() - for { - if len(pods) >= MaxPodsPerBatch { - return pods - } - select { - case pod := <-p.pods: - idle.Reset(MinBatchDuration) - pods = append(pods, pod) - case <-ctx.Done(): - return pods - case <-timeout.C: - return pods - case <-idle.C: - return pods - } - } -} - -// filter removes pods that have been assigned a node. +// isProvisionable ensure that the pod can still be provisioned. // This check is needed to prevent duplicate binds when a pod is scheduled to a node // between the time it was ingested into the scheduler and the time it is included // in a provisioner batch. -func (p *Provisioner) filter(ctx context.Context, pods []*v1.Pod) ([]*v1.Pod, error) { - provisionable := []*v1.Pod{} - for _, pod := range pods { - // Do not mutate the pod in case the scheduler relaxed constraints - stored := &v1.Pod{} - if err := p.kubeClient.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, stored); err != nil { - if errors.IsNotFound(err) { - continue - } - return nil, err - } - if stored.Spec.NodeName == "" { - provisionable = append(provisionable, pod) +func (p *Provisioner) isProvisionable(ctx context.Context, candidate *v1.Pod) (bool, error) { + stored := &v1.Pod{} + if err := p.kubeClient.Get(ctx, client.ObjectKeyFromObject(candidate), stored); err != nil { + if errors.IsNotFound(err) { + return false, nil } + return false, err } - return provisionable, nil + return !pod.IsScheduled(candidate), nil } func (p *Provisioner) launch(ctx context.Context, constraints *v1alpha5.Constraints, packing *binpacking.Packing) error { diff --git a/pkg/controllers/selection/controller.go b/pkg/controllers/selection/controller.go index 2c670ec183af..8199ead4833f 100644 --- a/pkg/controllers/selection/controller.go +++ b/pkg/controllers/selection/controller.go @@ -99,7 +99,7 @@ func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (errs e if provisioner == nil { return fmt.Errorf("matched 0/%d provisioners, %w", len(multierr.Errors(errs)), errs) } - provisioner.Add(ctx, pod) + <-provisioner.Add(ctx, pod) return nil } @@ -109,7 +109,6 @@ func isProvisionable(p *v1.Pod) bool { pod.FailedToSchedule(p) && !pod.IsOwnedByDaemonSet(p) && !pod.IsOwnedByNode(p) - } func validate(p *v1.Pod) error { diff --git a/pkg/test/expectations/expectations.go b/pkg/test/expectations/expectations.go index 8c71df3540d6..4e2bbb8b1671 100644 --- a/pkg/test/expectations/expectations.go +++ b/pkg/test/expectations/expectations.go @@ -158,7 +158,7 @@ func ExpectProvisioningCleanedUp(ctx context.Context, c client.Client, controlle } func ExpectProvisioned(ctx context.Context, c client.Client, selectionController *selection.Controller, provisioningController *provisioning.Controller, provisioner *v1alpha5.Provisioner, pods ...*v1.Pod) (result []*v1.Pod) { - provisioning.MaxPodsPerBatch = len(pods) + provisioning.MaxItemsPerBatch = len(pods) // Persist objects ExpectApplied(ctx, c, provisioner) ExpectStatusUpdated(ctx, c, provisioner)