Skip to content

Commit

Permalink
Rewrote batching logic to no longer rely on a results channel
Browse files Browse the repository at this point in the history
  • Loading branch information
ellistarn committed Jan 4, 2022
1 parent 1e51d5e commit 47a560b
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 83 deletions.
95 changes: 95 additions & 0 deletions pkg/controllers/provisioning/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provisioning

import (
"context"
"sync"
"time"
)

var (
MaxBatchDuration = time.Second * 10
MinBatchDuration = time.Second * 1
// MaxItemsPerBatch limits the number of items we process at one time to avoid using too much memory
MaxItemsPerBatch = 2_000
)

type Batcher struct {
sync.RWMutex
running context.Context
queue chan interface{}
gate context.Context
flush context.CancelFunc
}

func NewBatcher(running context.Context) *Batcher {
gate, flush := context.WithCancel(running)
return &Batcher{
running: running,
queue: make(chan interface{}),
gate: gate,
flush: flush,
}
}

// Add an item to the batch if still open
func (b *Batcher) Add(ctx context.Context, item interface{}) <-chan struct{} {
b.RLock()
defer b.RUnlock()
select {
case b.queue <- item:
return b.gate.Done()
case <-b.gate.Done():
return b.gate.Done()
case <-ctx.Done():
return ctx.Done()
}
}

func (b *Batcher) Flush() {
b.Lock()
defer b.Unlock()
b.flush()
b.gate, b.flush = context.WithCancel(b.running)
}

// Wait returns a slice of enqueued items after idle or timeout
func (b *Batcher) Wait(ctx context.Context) (items []interface{}, window time.Duration) {
// Start the batching window after the first item is received
items = append(items, <-b.queue)
start := time.Now()
defer func() {
window = time.Since(start)
}()
timeout := time.NewTimer(MaxBatchDuration)
idle := time.NewTimer(MinBatchDuration)
for {
if len(items) >= MaxItemsPerBatch {
return
}
select {
case task := <-b.queue:
idle.Reset(MinBatchDuration)
items = append(items, task)
case <-ctx.Done():
return
case <-timeout.C:
return
case <-idle.C:
return
}
}
}
112 changes: 32 additions & 80 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package provisioning
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider"
Expand All @@ -27,32 +27,23 @@ import (
"github.com/aws/karpenter/pkg/metrics"
"github.com/aws/karpenter/pkg/utils/functional"
"github.com/aws/karpenter/pkg/utils/injection"
"github.com/aws/karpenter/pkg/utils/pod"
"github.com/prometheus/client_golang/prometheus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/workqueue"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
)

var (
MaxBatchDuration = time.Second * 10
MinBatchDuration = time.Second * 1
// MaxPodsPerBatch limits the number of pods we process at one time to avoid using too much memory
MaxPodsPerBatch = 2_000
)

func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kubeClient client.Client, coreV1Client corev1.CoreV1Interface, cloudProvider cloudprovider.CloudProvider) *Provisioner {
running, stop := context.WithCancel(ctx)
p := &Provisioner{
Provisioner: provisioner,
pods: make(chan *v1.Pod),
wait: make(chan struct{}),
running: running,
batcher: NewBatcher(running),
Stop: stop,
cloudProvider: cloudProvider,
kubeClient: kubeClient,
Expand All @@ -61,24 +52,22 @@ func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kube
packer: binpacking.NewPacker(kubeClient, cloudProvider),
}
go func() {
for p.running.Err() == nil {
for ctx.Err() == nil {
if err := p.provision(ctx); err != nil {
logging.FromContext(ctx).Errorf("Provisioning failed, %s", err.Error())
}
}
close(p.wait)
logging.FromContext(ctx).Info("Stopping provisioner")
logging.FromContext(ctx).Info("Stopped provisioner")
}()
return p
}

// Provisioner waits for enqueued pods, batches them, creates capacity and binds the pods to the capacity.
type Provisioner struct {
// State
sync.RWMutex
*v1alpha5.Provisioner
pods chan *v1.Pod
wait chan struct{}
running context.Context
batcher *Batcher
Stop context.CancelFunc
// Dependencies
cloudProvider cloudprovider.CloudProvider
Expand All @@ -89,28 +78,27 @@ type Provisioner struct {
}

// Add a pod to the provisioner and block until it's processed. The caller
// is responsible for verifying that the pod was scheduled correctly. In the
// future, this may be expanded to include concepts such as retriable errors.
func (p *Provisioner) Add(ctx context.Context, pod *v1.Pod) {
select {
case p.pods <- pod: // Block until pod is enqueued
<-p.wait
case <-p.running.Done(): // Leave if closed
}
// is responsible for verifying that the pod was scheduled correctly.
func (p *Provisioner) Add(ctx context.Context, pod *v1.Pod) <-chan struct{} {
return p.batcher.Add(ctx, pod)
}

func (p *Provisioner) provision(ctx context.Context) (err error) {
// Wait for a batch of pods, release when done
pods := p.batch(ctx)
defer func() {
for i := 0; i < len(pods); i++ {
p.wait <- struct{}{}
// Batch pods
logging.FromContext(ctx).Infof("Waiting for unschedulable pods")
items, window := p.batcher.Wait(ctx)
defer p.batcher.Flush()
logging.FromContext(ctx).Infof("Batched %d pods in %s", len(items), window)
// Filter pods
pods := []*v1.Pod{}
for _, item := range items {
provisionable, err := p.isProvisionable(ctx, item.(*v1.Pod))
if err != nil {
return err
}
if provisionable {
pods = append(pods, item.(*v1.Pod))
}
}()
// Ensure pods are still provisionable
pods, err = p.filter(ctx, pods)
if err != nil {
return fmt.Errorf("filtering provisionable pods, %w", err)
}
// Separate pods by scheduling constraints
schedules, err := p.scheduler.Solve(ctx, p.Provisioner, pods)
Expand All @@ -133,55 +121,19 @@ func (p *Provisioner) provision(ctx context.Context) (err error) {
return nil
}

// Batch returns a slice of enqueued pods after idle or timeout
func (p *Provisioner) batch(ctx context.Context) (pods []*v1.Pod) {
logging.FromContext(ctx).Infof("Waiting for unschedulable pods")
// Start the batching window after the first pod is received
pods = append(pods, <-p.pods)
timeout := time.NewTimer(MaxBatchDuration)
idle := time.NewTimer(MinBatchDuration)
start := time.Now()
defer func() {
logging.FromContext(ctx).Infof("Batched %d pods in %s", len(pods), time.Since(start))
}()
for {
if len(pods) >= MaxPodsPerBatch {
return pods
}
select {
case pod := <-p.pods:
idle.Reset(MinBatchDuration)
pods = append(pods, pod)
case <-ctx.Done():
return pods
case <-timeout.C:
return pods
case <-idle.C:
return pods
}
}
}

// filter removes pods that have been assigned a node.
// isProvisionable ensure that the pod can still be provisioned.
// This check is needed to prevent duplicate binds when a pod is scheduled to a node
// between the time it was ingested into the scheduler and the time it is included
// in a provisioner batch.
func (p *Provisioner) filter(ctx context.Context, pods []*v1.Pod) ([]*v1.Pod, error) {
provisionable := []*v1.Pod{}
for _, pod := range pods {
// Do not mutate the pod in case the scheduler relaxed constraints
stored := &v1.Pod{}
if err := p.kubeClient.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, stored); err != nil {
if errors.IsNotFound(err) {
continue
}
return nil, err
}
if stored.Spec.NodeName == "" {
provisionable = append(provisionable, pod)
func (p *Provisioner) isProvisionable(ctx context.Context, candidate *v1.Pod) (bool, error) {
stored := &v1.Pod{}
if err := p.kubeClient.Get(ctx, client.ObjectKeyFromObject(candidate), stored); err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return false, err
}
return provisionable, nil
return !pod.IsScheduled(candidate), nil
}

func (p *Provisioner) launch(ctx context.Context, constraints *v1alpha5.Constraints, packing *binpacking.Packing) error {
Expand Down
3 changes: 1 addition & 2 deletions pkg/controllers/selection/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (errs e
if provisioner == nil {
return fmt.Errorf("matched 0/%d provisioners, %w", len(multierr.Errors(errs)), errs)
}
provisioner.Add(ctx, pod)
<-provisioner.Add(ctx, pod)
return nil
}

Expand All @@ -109,7 +109,6 @@ func isProvisionable(p *v1.Pod) bool {
pod.FailedToSchedule(p) &&
!pod.IsOwnedByDaemonSet(p) &&
!pod.IsOwnedByNode(p)

}

func validate(p *v1.Pod) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/test/expectations/expectations.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func ExpectProvisioningCleanedUp(ctx context.Context, c client.Client, controlle
}

func ExpectProvisioned(ctx context.Context, c client.Client, selectionController *selection.Controller, provisioningController *provisioning.Controller, provisioner *v1alpha5.Provisioner, pods ...*v1.Pod) (result []*v1.Pod) {
provisioning.MaxPodsPerBatch = len(pods)
provisioning.MaxItemsPerBatch = len(pods)
// Persist objects
ExpectApplied(ctx, c, provisioner)
ExpectStatusUpdated(ctx, c, provisioner)
Expand Down

0 comments on commit 47a560b

Please sign in to comment.