Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix duplicate bind issues #832

Merged
merged 4 commits into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/controllers/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (c *Controller) Apply(ctx context.Context, provisioner *v1alpha5.Provisione
done: ctx.Done(),
Stop: cancelFunc,
cloudProvider: c.cloudProvider,
kubeClient: c.kubeClient,
scheduler: c.scheduler,
launcher: c.launcher,
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/controllers/provisioning/scheduling"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var (
Expand All @@ -43,6 +45,7 @@ type Provisioner struct {
Stop context.CancelFunc
// Dependencies
cloudProvider cloudprovider.CloudProvider
kubeClient client.Client
scheduler *scheduling.Scheduler
launcher *Launcher
}
Expand Down Expand Up @@ -117,6 +120,7 @@ func (p *Provisioner) Batch(ctx context.Context) (pods []*v1.Pod) {
idle := time.NewTimer(MinBatchDuration)
start := time.Now()
defer func() {
pods = p.FilterProvisionable(ctx, pods)
logging.FromContext(ctx).Infof("Batched %d pods in %s", len(pods), time.Since(start))
}()
for {
Expand All @@ -136,3 +140,23 @@ func (p *Provisioner) Batch(ctx context.Context) (pods []*v1.Pod) {
}
}
}

// FilterProvisionable removes pods that have been assigned a node.
// This check is needed to prevent duplicate binds when a pod is scheduled to a node
// between the time it was ingested into the scheduler and the time it is included
// in a provisioner batch.
func (p *Provisioner) FilterProvisionable(ctx context.Context, pods []*v1.Pod) []*v1.Pod {
provisionable := []*v1.Pod{}
for _, pod := range pods {
// the original pod should be returned rather than the newly fetched pod in case the scheduler relaxed constraints
original := pod
candidate := &v1.Pod{}
if err := p.kubeClient.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, candidate); err != nil {
logging.FromContext(ctx).Errorf("Unexpected error retrieving pod \"%s/%s\" while checking if it is provisionable", pod.Namespace, pod.Name)
}
if candidate.Spec.NodeName == "" {
provisionable = append(provisionable, original)
}
}
return provisionable
}
18 changes: 3 additions & 15 deletions pkg/controllers/scheduling/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
return reconcile.Result{}, err
}
// Ensure the pod can be provisioned
if err := isUnschedulable(pod); err != nil {
if !isProvisionable(pod) {
return reconcile.Result{}, nil
}
if err := validate(pod); err != nil {
Expand Down Expand Up @@ -102,20 +102,8 @@ func (c *Controller) Schedule(ctx context.Context, pod *v1.Pod) error {
return nil
}

func isUnschedulable(p *v1.Pod) error {
if p.Spec.NodeName != "" {
return fmt.Errorf("already scheduled")
}
if !pod.FailedToSchedule(p) {
return fmt.Errorf("awaiting scheduling")
}
if pod.IsOwnedByDaemonSet(p) {
return fmt.Errorf("owned by daemonset")
}
if pod.IsOwnedByNode(p) {
return fmt.Errorf("owned by node")
}
return nil
func isProvisionable(p *v1.Pod) bool {
return p.Spec.NodeName == "" && pod.FailedToSchedule(p) && !pod.IsOwnedByDaemonSet(p) && !pod.IsOwnedByNode(p)
}

func validate(p *v1.Pod) error {
Expand Down