Skip to content

Commit

Permalink
Adding non-blocking asynchronous bind support.
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-berger committed Aug 24, 2021
1 parent f946dc5 commit c00b9bb
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 53 deletions.
1 change: 1 addition & 0 deletions pkg/apis/provisioning/v1alpha3/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ var (
// Reserved annotations
DoNotEvictPodAnnotationKey = SchemeGroupVersion.Group + "/do-not-evict"
EmptinessTimestampAnnotationKey = SchemeGroupVersion.Group + "/emptiness-timestamp"
AsyncBindFailureAnnotationKey = SchemeGroupVersion.Group + "/async-bind-failed-at"
// Finalizers
TerminationFinalizer = SchemeGroupVersion.Group + "/termination"
// Default provisioner
Expand Down
3 changes: 2 additions & 1 deletion pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha3"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/utils/functional"
"github.com/awslabs/karpenter/pkg/utils/node"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -47,7 +48,7 @@ func (c *CloudProvider) Create(ctx context.Context, provisioner *v1alpha3.Provis
go func() {
taints := make([]v1.Taint, 0)
taints = append(taints, packing.Constraints.Taints...)
taints = append(taints, packing.Constraints.ReadinessTaints...)
taints = node.UniqueTaints(taints, packing.Constraints.ReadinessTaints...)
err <- bind(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand Down
120 changes: 68 additions & 52 deletions pkg/controllers/allocation/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,9 @@ type Binder struct {
}

func (b *Binder) Bind(ctx context.Context, node *v1.Node, packing *cloudprovider.Packing) error {
pods := packing.Pods
stored := node.DeepCopy()
// 1. Add the Karpenter finalizer to the node to enable the termination workflow
node.Finalizers = append(node.Finalizers, v1alpha3.TerminationFinalizer)
// 2. Taint karpenter.sh/not-ready=NoSchedule to prevent the kube scheduler
// from scheduling pods before we're able to bind them ourselves. The kube
// scheduler has an eventually consistent cache of nodes and pods, so it's
// possible for it to see a provisioned node before it sees the pods bound
// to it. This creates an edge case where other pending pods may be bound to
// the node by the kube scheduler, causing OutOfCPU errors when the
// binpacked pods race to bind to the same node. The system eventually
// heals, but causes delays from additional provisioning (thrash). This
// taint will be removed by the node controller when a node is marked ready.
node.Spec.Taints = append(node.Spec.Taints, v1.Taint{
Key: v1alpha3.NotReadyTaintKey,
Effect: v1.TaintEffectNoSchedule,
})
// 3. Idempotently create a node. In rare cases, nodes can come online and
// 2. Idempotently create a node. In rare cases, nodes can come online and
// self register before the controller is able to register a node object
// with the API server. In the common case, we create the node object
// ourselves to enforce the binding decision and enable images to be pulled
Expand All @@ -65,47 +50,74 @@ func (b *Binder) Bind(ctx context.Context, node *v1.Node, packing *cloudprovider
if !errors.IsAlreadyExists(err) {
return fmt.Errorf("creating node %s, %w", node.Name, err)
}
// If the node object already exists, make sure finalizer and taint are in place.
if err := b.KubeClient.Patch(ctx, node, client.StrategicMergeFrom(stored)); err != nil {
return fmt.Errorf("patching node %s, %w", node.Name, err)
// If the node object already exists, the finalizer controller will make sure
// the finalizer is in place, so we can ignore this case here.
}
// 3 Partition pods into readiness tolerant (immediately schedulable) and
// intolerant (not yet schedulable) sets of pods
intolerantPods := make([]*v1.Pod, 0)
tolerantPods := make([]*v1.Pod, 0)
for _, p := range packing.Pods {
if pod.ToleratesTaints(&p.Spec, packing.Constraints.ReadinessTaints...) == nil {
tolerantPods = append(tolerantPods, p)
} else {
intolerantPods = append(intolerantPods, p)
}
}
// 4. Asynchronously bind intolerant pods if there are any.
if len(intolerantPods) > 0 {
go b.asyncBind(ctx, node, packing.Constraints.ReadinessTaints, intolerantPods)
}
// 5. Synchronously bind tolerant (immediate) pods
errs := make([]error, len(tolerantPods))
workqueue.ParallelizeUntil(ctx, len(tolerantPods), len(tolerantPods), func(index int) {
errs[index] = b.bind(ctx, node, tolerantPods[index])
})
err := multierr.Combine(errs...)
logging.FromContext(ctx).Infof("Immediately bound %d out of %d pod(s) to node %s", len(tolerantPods)-len(multierr.Errors(err)), len(tolerantPods), node.Name)
return err
}

func (b *Binder) asyncBind(ctx context.Context, node *v1.Node, readinessTaints []v1.Taint, pods []*v1.Pod) {
logger := logging.FromContext(ctx)
logger.Infof("%d pods are not yet schedulable on node %s due to existing node readiness taints", len(pods), node.Name)
node, err := b.waitForNodeReadiness(ctx, node, readinessTaints)
if err != nil {
logger.Warnf("Asynchronous bind failed while waiting for node to become ready, %w", err)
b.triggerReconcileFor(ctx, pods...)
return
}
errs := make([]error, len(pods))
// 4. Wait for node readiness.
if len(packing.Constraints.ReadinessTaints) > 0 {
// 4.1. Partitiony pods into readiness tolerant and intolerant sets of pods
intolerantPods := make([]*v1.Pod, 0)
tolerantPods := make([]*v1.Pod, 0)
for _, p := range pods {
if pod.ToleratesTaints(&p.Spec, packing.Constraints.ReadinessTaints...) == nil {
tolerantPods = append(tolerantPods, p)
workqueue.ParallelizeUntil(ctx, len(pods), len(pods), func(index int) {
pod := pods[index]
err := b.bind(ctx, node, pod)
if err != nil {
if !errors.IsNotFound(err) {
logger.Infof("failed to bind pod %s/%s to node %s, %w", pod.Namespace, pod.Name, node.Name, err)
b.triggerReconcileFor(ctx, pod)
} else {
intolerantPods = append(intolerantPods, p)
logger.Debugf("failed to bind pod %s/%s to node %s, %w", pod.Namespace, pod.Name, node.Name, err)
}
}
/// 4.2. Bind all Pods which tolerate all readiness taints
workqueue.ParallelizeUntil(ctx, len(tolerantPods), len(tolerantPods), func(index int) {
errs[index] = b.bind(ctx, node, tolerantPods[index])
})
// 4.3. If there are readiness intolerant pods left, which do not tolerate all readiness taints,
// wait for all readiness taints to be removed from the node
pods = intolerantPods
if len(pods) > 0 {
var err error
node, err = b.waitForNodeReadiness(ctx, node, packing.Constraints.ReadinessTaints)
if err != nil {
errs = append(errs, err)
return multierr.Combine(errs...)
}
errs[index] = err
})
err = multierr.Combine(errs...)
logger.Infof("Asynchronously bound %d out of %d pod(s) to node %s", len(pods)-len(multierr.Errors(err)), len(pods), node.Name)
}

func (b *Binder) triggerReconcileFor(ctx context.Context, pods ...*v1.Pod) {
logger := logging.FromContext(ctx)
for _, pod := range pods {
// Touch (update) pod to make sure it triggers another reconciliation for the corresponding
// provisioner.
stored := pod.DeepCopy()
// TODO what exactly shall we update status.Conditions or some metadata.annotation?
pod.ObjectMeta.Annotations[v1alpha3.AsyncBindFailureAnnotationKey] = time.Now().String()
err := b.KubeClient.Patch(ctx, pod, client.StrategicMergeFrom(stored), &client.PatchOptions{})
if err != nil {
logger.Warnf("failed to update status of pod %s/%s, %w", pod.Namespace, pod.Name, err)
}
}
// 5. Bind pods
workqueue.ParallelizeUntil(ctx, len(pods), len(pods), func(index int) {
errs[index] = b.bind(ctx, node, pods[index])
})
err := multierr.Combine(errs...)
logging.FromContext(ctx).Infof("Bound %d pod(s) to node %s", len(pods)-len(multierr.Errors(err)), node.Name)
return err
}

func (b *Binder) waitForNodeReadiness(ctx context.Context, node *v1.Node, readinessTaints []v1.Taint) (*v1.Node, error) {
Expand All @@ -123,7 +135,7 @@ func (b *Binder) waitForNodeReadiness(ctx context.Context, node *v1.Node, readin
if !errors.IsNotFound(err) {
return nil, fmt.Errorf("node %s was removed, %w", node.Name, err)
}
if !hasAnyTaint(node, readinessTaints) {
if !hasAnyReadinessTaint(node, readinessTaints) {
break
}
event := <-nodeWatch.ResultChan()
Expand All @@ -146,9 +158,13 @@ func (b *Binder) waitForNodeReadiness(ctx context.Context, node *v1.Node, readin
return node, nil
}

func hasAnyTaint(node *v1.Node, taints []v1.Taint) bool {
func hasAnyReadinessTaint(node *v1.Node, readinessTaints []v1.Taint) bool {
for _, nodeTaint := range node.Spec.Taints {
for _, readinessTaint := range taints {
// Ignore karpenters own readiness taint
if nodeTaint.Key == v1alpha3.NotReadyTaintKey {
continue
}
for _, readinessTaint := range readinessTaints {
if nodeTaint.Key == readinessTaint.Key && nodeTaint.Value == readinessTaint.Value && nodeTaint.Effect == readinessTaint.Effect {
return true
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/controllers/allocation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha3"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/packing"
"github.com/awslabs/karpenter/pkg/utils/node"
"github.com/awslabs/karpenter/pkg/utils/result"
"go.uber.org/multierr"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -124,6 +125,20 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
taints := make([]v1.Taint, 0)
taints = append(taints, packing.Constraints.Taints...)
taints = append(taints, packing.Constraints.ReadinessTaints...)
// Add karpenters own readiness readinessTaint to make sure it is set by the provisioner.
// We readinessTaint karpenter.sh/not-ready=NoSchedule to prevent the kube scheduler
// from scheduling pods before we're able to bind them ourselves. The kube
// scheduler has an eventually consistent cache of nodes and pods, so it's
// possible for it to see a provisioned node before it sees the pods bound
// to it. This creates an edge case where other pending pods may be bound to
// the node by the kube scheduler, causing OutOfCPU errors when the
// binpacked pods race to bind to the same node. The system eventually
// heals, but causes delays from additional provisioning (thrash). This
// readinessTaint will be removed by the node controller when a node is marked ready.
taints = node.UniqueTaints(taints, v1.Taint{
Key: v1alpha3.NotReadyTaintKey,
Effect: v1.TaintEffectNoSchedule,
})
errs[index] = <-c.CloudProvider.Create(ctx, provisioner, packing, func(node *v1.Node) error {
node.Labels = packing.Constraints.Labels
node.Spec.Taints = taints
Expand Down
32 changes: 32 additions & 0 deletions pkg/utils/node/scheduling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package node

import v1 "k8s.io/api/core/v1"

func UniqueTaints(taints []v1.Taint, extraTaints ...v1.Taint) []v1.Taint {
unique := make([]v1.Taint, 0)
for _, input := range [][]v1.Taint{taints, extraTaints} {
for _, t := range input {
for _, u := range unique {
if t.Key == u.Key && t.Value == u.Value && t.Effect == u.Effect {
continue
}
}
unique = append(unique, t)
}
}
return unique
}
5 changes: 5 additions & 0 deletions pkg/utils/pod/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pod
import (
"fmt"

"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha3"
"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -58,6 +59,10 @@ func ToleratesTaints(spec *v1.PodSpec, taints ...v1.Taint) (err error) {

// Tolerates returns true if one of the tolerations tolerate the taint
func Tolerates(tolerations []v1.Toleration, taint v1.Taint) bool {
// Always tolerate karpenters own readiness taint.
if taint.Key == v1alpha3.NotReadyTaintKey {
return true
}
for _, t := range tolerations {
if t.ToleratesTaint(&taint) {
return true
Expand Down

0 comments on commit c00b9bb

Please sign in to comment.