Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrote batching logic to no longer rely on a results channel #1070

Merged
merged 2 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions pkg/controllers/provisioning/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provisioning

import (
"context"
"sync"
"time"
)

var (
MaxBatchDuration = time.Second * 10
BatchIdleDuration = time.Second * 1
// MaxItemsPerBatch limits the number of items we process at one time to avoid using too much memory
MaxItemsPerBatch = 2_000
)

// Batcher separates a stream of Add(item) calls into windowed slices. The
// window is dynamic and will be extended if additional items are added up to a
// maximum batch duration or maximum items per batch.
type Batcher struct {
sync.RWMutex
running context.Context
queue chan interface{}
gate context.Context
flush context.CancelFunc
}

// NewBatcher is a constructor
func NewBatcher(running context.Context) *Batcher {
gate, flush := context.WithCancel(running)
return &Batcher{
running: running,
queue: make(chan interface{}),
gate: gate,
flush: flush,
}
}

// Add an item to the batch, returning the next gate which the caller may block
// on. The gate is protected by a read-write mutex, and may be modified by
// Flush(), which makes a new gate.
//
// In rare scenarios, if a goroutine hangs after enqueueing but before acquiring
// the gate lock, the batch could be flushed, resulting in the pod waiting on
// the next gate. This will be flushed on the next batch, and may result in
// delayed retries for the individual pod if the provisioning loop fails. In
// practice, this won't be encountered because this time window is O(seconds).
func (b *Batcher) Add(item interface{}) <-chan struct{} {
select {
case b.queue <- item:
case <-b.running.Done():
}
b.RLock()
defer b.RUnlock()
return b.gate.Done()
}

// Flush all goroutines blocking on the current gate and create a new gate.
func (b *Batcher) Flush() {
b.Lock()
defer b.Unlock()
b.flush()
b.gate, b.flush = context.WithCancel(b.running)
}

// Wait starts a batching window and returns a slice of items when closed.
func (b *Batcher) Wait() (items []interface{}, window time.Duration) {
// Start the batching window after the first item is received
items = append(items, <-b.queue)
start := time.Now()
defer func() {
window = time.Since(start)
}()
timeout := time.NewTimer(MaxBatchDuration)
idle := time.NewTimer(BatchIdleDuration)
for {
if len(items) >= MaxItemsPerBatch {
return
}
select {
case item := <-b.queue:
idle.Reset(BatchIdleDuration)
items = append(items, item)
case <-timeout.C:
return
case <-idle.C:
return
}
}
}
116 changes: 33 additions & 83 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider"
Expand All @@ -27,32 +26,23 @@ import (
"github.com/aws/karpenter/pkg/metrics"
"github.com/aws/karpenter/pkg/utils/functional"
"github.com/aws/karpenter/pkg/utils/injection"
"github.com/aws/karpenter/pkg/utils/pod"
"github.com/prometheus/client_golang/prometheus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/workqueue"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
)

var (
MaxBatchDuration = time.Second * 10
MinBatchDuration = time.Second * 1
// MaxPodsPerBatch limits the number of pods we process at one time to avoid using too much memory
MaxPodsPerBatch = 2_000
)

func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kubeClient client.Client, coreV1Client corev1.CoreV1Interface, cloudProvider cloudprovider.CloudProvider) *Provisioner {
running, stop := context.WithCancel(ctx)
p := &Provisioner{
Provisioner: provisioner,
pods: make(chan *v1.Pod),
wait: make(chan struct{}),
running: running,
batcher: NewBatcher(running),
Stop: stop,
cloudProvider: cloudProvider,
kubeClient: kubeClient,
Expand All @@ -61,13 +51,12 @@ func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kube
packer: binpacking.NewPacker(kubeClient, cloudProvider),
}
go func() {
for p.running.Err() == nil {
if err := p.provision(ctx); err != nil {
logging.FromContext(ctx).Errorf("Provisioning failed, %s", err.Error())
for running.Err() == nil {
if err := p.provision(running); err != nil {
logging.FromContext(running).Errorf("Provisioning failed, %s", err.Error())
}
}
close(p.wait)
logging.FromContext(ctx).Info("Stopping provisioner")
logging.FromContext(running).Info("Stopped provisioner")
}()
return p
}
Expand All @@ -76,9 +65,7 @@ func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kube
type Provisioner struct {
// State
*v1alpha5.Provisioner
pods chan *v1.Pod
wait chan struct{}
running context.Context
batcher *Batcher
Stop context.CancelFunc
// Dependencies
cloudProvider cloudprovider.CloudProvider
Expand All @@ -88,29 +75,28 @@ type Provisioner struct {
packer *binpacking.Packer
}

// Add a pod to the provisioner and block until it's processed. The caller
// is responsible for verifying that the pod was scheduled correctly. In the
// future, this may be expanded to include concepts such as retriable errors.
func (p *Provisioner) Add(ctx context.Context, pod *v1.Pod) {
select {
case p.pods <- pod: // Block until pod is enqueued
<-p.wait
case <-p.running.Done(): // Leave if closed
}
// Add a pod to the provisioner and return a channel to block on. The caller is
// responsible for verifying that the pod was scheduled correctly.
func (p *Provisioner) Add(pod *v1.Pod) <-chan struct{} {
return p.batcher.Add(pod)
}

func (p *Provisioner) provision(ctx context.Context) (err error) {
// Wait for a batch of pods, release when done
pods := p.batch(ctx)
defer func() {
for i := 0; i < len(pods); i++ {
p.wait <- struct{}{}
// Batch pods
logging.FromContext(ctx).Infof("Waiting for unschedulable pods")
items, window := p.batcher.Wait()
defer p.batcher.Flush()
bwagner5 marked this conversation as resolved.
Show resolved Hide resolved
logging.FromContext(ctx).Infof("Batched %d pods in %s", len(items), window)
// Filter pods
pods := []*v1.Pod{}
for _, item := range items {
provisionable, err := p.isProvisionable(ctx, item.(*v1.Pod))
if err != nil {
return err
}
if provisionable {
pods = append(pods, item.(*v1.Pod))
}
}()
// Ensure pods are still provisionable
pods, err = p.filter(ctx, pods)
if err != nil {
return fmt.Errorf("filtering provisionable pods, %w", err)
}
// Separate pods by scheduling constraints
schedules, err := p.scheduler.Solve(ctx, p.Provisioner, pods)
Expand All @@ -133,55 +119,19 @@ func (p *Provisioner) provision(ctx context.Context) (err error) {
return nil
}

// Batch returns a slice of enqueued pods after idle or timeout
func (p *Provisioner) batch(ctx context.Context) (pods []*v1.Pod) {
logging.FromContext(ctx).Infof("Waiting for unschedulable pods")
// Start the batching window after the first pod is received
pods = append(pods, <-p.pods)
timeout := time.NewTimer(MaxBatchDuration)
idle := time.NewTimer(MinBatchDuration)
start := time.Now()
defer func() {
logging.FromContext(ctx).Infof("Batched %d pods in %s", len(pods), time.Since(start))
}()
for {
if len(pods) >= MaxPodsPerBatch {
return pods
}
select {
case pod := <-p.pods:
idle.Reset(MinBatchDuration)
pods = append(pods, pod)
case <-ctx.Done():
return pods
case <-timeout.C:
return pods
case <-idle.C:
return pods
}
}
}

// filter removes pods that have been assigned a node.
// isProvisionable ensure that the pod can still be provisioned.
// This check is needed to prevent duplicate binds when a pod is scheduled to a node
// between the time it was ingested into the scheduler and the time it is included
// in a provisioner batch.
func (p *Provisioner) filter(ctx context.Context, pods []*v1.Pod) ([]*v1.Pod, error) {
provisionable := []*v1.Pod{}
for _, pod := range pods {
// Do not mutate the pod in case the scheduler relaxed constraints
stored := &v1.Pod{}
if err := p.kubeClient.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, stored); err != nil {
if errors.IsNotFound(err) {
continue
}
return nil, err
}
if stored.Spec.NodeName == "" {
provisionable = append(provisionable, pod)
func (p *Provisioner) isProvisionable(ctx context.Context, candidate *v1.Pod) (bool, error) {
stored := &v1.Pod{}
if err := p.kubeClient.Get(ctx, client.ObjectKeyFromObject(candidate), stored); err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return false, err
}
return provisionable, nil
return !pod.IsScheduled(candidate), nil
}

func (p *Provisioner) launch(ctx context.Context, constraints *v1alpha5.Constraints, packing *binpacking.Packing) error {
Expand Down
8 changes: 5 additions & 3 deletions pkg/controllers/selection/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
logging.FromContext(ctx).Debugf("Could not schedule pod, %s", err.Error())
return reconcile.Result{}, err
}
return reconcile.Result{RequeueAfter: time.Second * 1}, nil
return reconcile.Result{RequeueAfter: time.Second * 5}, nil
}

func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (errs error) {
Expand All @@ -99,7 +99,10 @@ func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (errs e
if provisioner == nil {
return fmt.Errorf("matched 0/%d provisioners, %w", len(multierr.Errors(errs)), errs)
}
provisioner.Add(ctx, pod)
select {
case <-provisioner.Add(pod):
case <-ctx.Done():
}
return nil
}

Expand All @@ -109,7 +112,6 @@ func isProvisionable(p *v1.Pod) bool {
pod.FailedToSchedule(p) &&
!pod.IsOwnedByDaemonSet(p) &&
!pod.IsOwnedByNode(p)

}

func validate(p *v1.Pod) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/test/expectations/expectations.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func ExpectProvisioningCleanedUp(ctx context.Context, c client.Client, controlle
}

func ExpectProvisioned(ctx context.Context, c client.Client, selectionController *selection.Controller, provisioningController *provisioning.Controller, provisioner *v1alpha5.Provisioner, pods ...*v1.Pod) (result []*v1.Pod) {
provisioning.MaxPodsPerBatch = len(pods)
provisioning.MaxItemsPerBatch = len(pods)
// Persist objects
ExpectApplied(ctx, c, provisioner)
ExpectStatusUpdated(ctx, c, provisioner)
Expand Down