Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolved a bug where a pod scheduled during provisioning would cause a waiting goroutine #938

Merged
merged 1 commit into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 57 additions & 72 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ var (
)

func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kubeClient client.Client, coreV1Client corev1.CoreV1Interface, cloudProvider cloudprovider.CloudProvider) *Provisioner {
c, stop := context.WithCancel(ctx)
running, stop := context.WithCancel(ctx)
p := &Provisioner{
Provisioner: provisioner,
pods: make(chan *v1.Pod),
results: make(chan error),
done: c.Done(),
wait: make(chan struct{}),
running: running,
Stop: stop,
cloudProvider: cloudProvider,
kubeClient: kubeClient,
Expand All @@ -61,17 +61,13 @@ func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kube
packer: binpacking.NewPacker(kubeClient, cloudProvider),
}
go func() {
for {
select {
case <-p.done:
logging.FromContext(ctx).Info("Stopping provisioner")
return
default:
if err := p.provision(ctx); err != nil {
logging.FromContext(ctx).Errorf("Provisioning failed, %s", err.Error())
}
for p.running.Err() == nil {
if err := p.provision(ctx); err != nil {
logging.FromContext(ctx).Errorf("Provisioning failed, %s", err.Error())
}
}
close(p.wait)
logging.FromContext(ctx).Info("Stopping provisioner")
}()
return p
}
Expand All @@ -81,9 +77,9 @@ type Provisioner struct {
// State
*v1alpha5.Provisioner
pods chan *v1.Pod
results chan error
wait chan struct{}
running context.Context
Stop context.CancelFunc
done <-chan struct{}
// Dependencies
cloudProvider cloudprovider.CloudProvider
kubeClient client.Client
Expand All @@ -92,18 +88,30 @@ type Provisioner struct {
packer *binpacking.Packer
}

// Add a pod to the provisioner and block until it's processed. The caller
// is responsible for verifying that the pod was scheduled correctly. In the
// future, this may be expanded to include concepts such as retriable errors.
func (p *Provisioner) Add(ctx context.Context, pod *v1.Pod) {
select {
case p.pods <- pod: // Block until pod is enqueued
<-p.wait
case <-p.running.Done(): // Leave if closed
}
}

func (p *Provisioner) provision(ctx context.Context) (err error) {
// Wait for a batch of pods
pods := p.Batch(ctx)
// Communicate the result of the provisioning loop to each of the pods.
// Wait for a batch of pods, release when done
pods := p.batch(ctx)
defer func() {
for i := 0; i < len(pods); i++ {
select {
case p.results <- err: // Block until result is communicated
case <-p.done: // Leave if closed
}
p.wait <- struct{}{}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we would filter pods before releasing the threads, which resulted in miscounting when pods were bound.

}
}()
// Ensure pods are still provisionable
pods, err = p.filter(ctx, pods)
if err != nil {
return fmt.Errorf("filtering provisionable pods, %w", err)
}
// Separate pods by scheduling constraints
schedules, err := p.scheduler.Solve(ctx, p.Provisioner, pods)
if err != nil {
Expand All @@ -117,39 +125,23 @@ func (p *Provisioner) provision(ctx context.Context) (err error) {
}
for _, packing := range packings {
if err := p.launch(ctx, schedule.Constraints, packing); err != nil {
logging.FromContext(ctx).Error("Could not launch node, %s", err.Error())
logging.FromContext(ctx).Errorf("Could not launch node, %s", err.Error())
continue
}
}
}
return nil
}

// Add a pod to the provisioner and block until it's processed. The caller
// is responsible for verifying that the pod was scheduled correctly. In the
// future, this may be expanded to include concepts such as retriable errors.
func (p *Provisioner) Add(ctx context.Context, pod *v1.Pod) (err error) {
select {
case p.pods <- pod: // Block until pod is enqueued
case <-p.done: // Leave if closed
}
select {
case err = <-p.results: // Block until result is sent
case <-p.done: // Leave if closed
}
return err
}

// Batch returns a slice of enqueued pods after idle or timeout
func (p *Provisioner) Batch(ctx context.Context) (pods []*v1.Pod) {
func (p *Provisioner) batch(ctx context.Context) (pods []*v1.Pod) {
logging.FromContext(ctx).Infof("Waiting for unschedulable pods")
// Start the batching window after the first pod is received
pods = append(pods, <-p.pods)
timeout := time.NewTimer(MaxBatchDuration)
idle := time.NewTimer(MinBatchDuration)
start := time.Now()
defer func() {
pods = p.FilterProvisionable(ctx, pods)
logging.FromContext(ctx).Infof("Batched %d pods in %s", len(pods), time.Since(start))
}()
for {
Expand All @@ -170,36 +162,47 @@ func (p *Provisioner) Batch(ctx context.Context) (pods []*v1.Pod) {
}
}

// FilterProvisionable removes pods that have been assigned a node.
// filter removes pods that have been assigned a node.
// This check is needed to prevent duplicate binds when a pod is scheduled to a node
// between the time it was ingested into the scheduler and the time it is included
// in a provisioner batch.
func (p *Provisioner) FilterProvisionable(ctx context.Context, pods []*v1.Pod) []*v1.Pod {
func (p *Provisioner) filter(ctx context.Context, pods []*v1.Pod) ([]*v1.Pod, error) {
provisionable := []*v1.Pod{}
for _, pod := range pods {
// the original pod should be returned rather than the newly fetched pod in case the scheduler relaxed constraints
original := pod
candidate := &v1.Pod{}
if err := p.kubeClient.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, candidate); err != nil {
logging.FromContext(ctx).Errorf("Could not verify pod \"%s/%s\" is provisionable, %s", pod.Namespace, pod.Name, err.Error())
continue
// Do not mutate the pod in case the scheduler relaxed constraints
stored := &v1.Pod{}
if err := p.kubeClient.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, stored); err != nil {
if errors.IsNotFound(err) {
continue
}
return nil, err
}
if candidate.Spec.NodeName == "" {
provisionable = append(provisionable, original)
if stored.Spec.NodeName == "" {
provisionable = append(provisionable, pod)
}
}
return provisionable
return provisionable, nil
}

func (p *Provisioner) launch(ctx context.Context, constraints *v1alpha5.Constraints, packing *binpacking.Packing) error {
if err := p.verifyResourceLimits(ctx, p.Provisioner); err != nil {
return fmt.Errorf("limits exceeded, %w", err)
// Check limits
latest := &v1alpha5.Provisioner{}
if err := p.kubeClient.Get(ctx, client.ObjectKeyFromObject(p.Provisioner), latest); err != nil {
return fmt.Errorf("getting current resource usage, %w", err)
}
if err := p.Spec.Limits.ExceededBy(latest.Status.Resources); err != nil {
return err
}
// Create and Bind
pods := make(chan []*v1.Pod, len(packing.Pods))
defer close(pods)
for _, ps := range packing.Pods {
pods <- ps
}
packedPods := queueFor(packing.Pods)
return <-p.cloudProvider.Create(ctx, constraints, packing.InstanceTypeOptions, packing.NodeQuantity, func(node *v1.Node) error {
node.Labels = functional.UnionStringMaps(node.Labels, constraints.Labels)
node.Spec.Taints = append(node.Spec.Taints, constraints.Taints...)
return p.bind(ctx, node, <-packedPods)
return p.bind(ctx, node, <-pods)
})
}

Expand Down Expand Up @@ -246,24 +249,6 @@ func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (
return nil
}

func (p *Provisioner) verifyResourceLimits(ctx context.Context, provisioner *v1alpha5.Provisioner) error {
latest := &v1alpha5.Provisioner{}
if err := p.kubeClient.Get(ctx, client.ObjectKeyFromObject(provisioner), latest); err != nil {
return fmt.Errorf("getting current resource usage, %w", err)
}
return provisioner.Spec.Limits.ExceededBy(latest.Status.Resources)
}

// Thread safe channel to pop off packed pod slices
func queueFor(pods [][]*v1.Pod) <-chan []*v1.Pod {
queue := make(chan []*v1.Pod, len(pods))
defer close(queue)
for _, ps := range pods {
queue <- ps
}
return queue
}

var bindTimeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Expand Down
20 changes: 9 additions & 11 deletions pkg/controllers/scheduling/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,22 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
return reconcile.Result{}, nil
}
// Select a provisioner, wait for it to bind the pod, and verify scheduling succeeded in the next loop
provisioner, err := c.selectProvisioner(ctx, pod)
if err != nil {
if err := c.selectProvisioner(ctx, pod); err != nil {
logging.FromContext(ctx).Debugf("Could not schedule pod, %s", err.Error())
return reconcile.Result{}, err
}
provisioner.Add(ctx, pod)
return reconcile.Result{RequeueAfter: time.Second * 1}, nil
}

func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (*provisioning.Provisioner, error) {
func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (errs error) {
// Relax preferences if pod has previously failed to schedule.
c.preferences.Relax(ctx, pod)
// Pick provisioner
var provisioner *provisioning.Provisioner
var errs error
provisioners := c.provisioners.List(ctx)
if len(provisioners) == 0 {
return nil
}
for _, candidate := range c.provisioners.List(ctx) {
if err := candidate.Spec.DeepCopy().ValidatePod(pod); err != nil {
errs = multierr.Append(errs, fmt.Errorf("tried provisioner/%s: %w", candidate.Name, err))
Expand All @@ -94,13 +95,10 @@ func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (*provi
}
}
if provisioner == nil {
err := fmt.Errorf("matched 0/%d provisioners", len(multierr.Errors(errs)))
if errs != nil {
err = fmt.Errorf("%s, %w", err, errs)
}
return nil, err
return fmt.Errorf("matched 0/%d provisioners, %w", len(multierr.Errors(errs)), errs)
}
return provisioner, nil
provisioner.Add(ctx, pod)
return nil
}

func isProvisionable(p *v1.Pod) bool {
Expand Down