Skip to content

Commit

Permalink
Continued scheduling refactor (#938)
Browse files Browse the repository at this point in the history
  • Loading branch information
ellistarn authored Dec 9, 2021
1 parent f39dbb4 commit 10c13f1
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 83 deletions.
129 changes: 57 additions & 72 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ var (
)

func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kubeClient client.Client, coreV1Client corev1.CoreV1Interface, cloudProvider cloudprovider.CloudProvider) *Provisioner {
c, stop := context.WithCancel(ctx)
running, stop := context.WithCancel(ctx)
p := &Provisioner{
Provisioner: provisioner,
pods: make(chan *v1.Pod),
results: make(chan error),
done: c.Done(),
wait: make(chan struct{}),
running: running,
Stop: stop,
cloudProvider: cloudProvider,
kubeClient: kubeClient,
Expand All @@ -61,17 +61,13 @@ func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kube
packer: binpacking.NewPacker(kubeClient, cloudProvider),
}
go func() {
for {
select {
case <-p.done:
logging.FromContext(ctx).Info("Stopping provisioner")
return
default:
if err := p.provision(ctx); err != nil {
logging.FromContext(ctx).Errorf("Provisioning failed, %s", err.Error())
}
for p.running.Err() == nil {
if err := p.provision(ctx); err != nil {
logging.FromContext(ctx).Errorf("Provisioning failed, %s", err.Error())
}
}
close(p.wait)
logging.FromContext(ctx).Info("Stopping provisioner")
}()
return p
}
Expand All @@ -81,9 +77,9 @@ type Provisioner struct {
// State
*v1alpha5.Provisioner
pods chan *v1.Pod
results chan error
wait chan struct{}
running context.Context
Stop context.CancelFunc
done <-chan struct{}
// Dependencies
cloudProvider cloudprovider.CloudProvider
kubeClient client.Client
Expand All @@ -92,18 +88,30 @@ type Provisioner struct {
packer *binpacking.Packer
}

// Add a pod to the provisioner and block until it's processed. The caller
// is responsible for verifying that the pod was scheduled correctly. In the
// future, this may be expanded to include concepts such as retriable errors.
func (p *Provisioner) Add(ctx context.Context, pod *v1.Pod) {
select {
case p.pods <- pod: // Block until pod is enqueued
<-p.wait
case <-p.running.Done(): // Leave if closed
}
}

func (p *Provisioner) provision(ctx context.Context) (err error) {
// Wait for a batch of pods
pods := p.Batch(ctx)
// Communicate the result of the provisioning loop to each of the pods.
// Wait for a batch of pods, release when done
pods := p.batch(ctx)
defer func() {
for i := 0; i < len(pods); i++ {
select {
case p.results <- err: // Block until result is communicated
case <-p.done: // Leave if closed
}
p.wait <- struct{}{}
}
}()
// Ensure pods are still provisionable
pods, err = p.filter(ctx, pods)
if err != nil {
return fmt.Errorf("filtering provisionable pods, %w", err)
}
// Separate pods by scheduling constraints
schedules, err := p.scheduler.Solve(ctx, p.Provisioner, pods)
if err != nil {
Expand All @@ -117,39 +125,23 @@ func (p *Provisioner) provision(ctx context.Context) (err error) {
}
for _, packing := range packings {
if err := p.launch(ctx, schedule.Constraints, packing); err != nil {
logging.FromContext(ctx).Error("Could not launch node, %s", err.Error())
logging.FromContext(ctx).Errorf("Could not launch node, %s", err.Error())
continue
}
}
}
return nil
}

// Add a pod to the provisioner and block until it's processed. The caller
// is responsible for verifying that the pod was scheduled correctly. In the
// future, this may be expanded to include concepts such as retriable errors.
func (p *Provisioner) Add(ctx context.Context, pod *v1.Pod) (err error) {
select {
case p.pods <- pod: // Block until pod is enqueued
case <-p.done: // Leave if closed
}
select {
case err = <-p.results: // Block until result is sent
case <-p.done: // Leave if closed
}
return err
}

// Batch returns a slice of enqueued pods after idle or timeout
func (p *Provisioner) Batch(ctx context.Context) (pods []*v1.Pod) {
func (p *Provisioner) batch(ctx context.Context) (pods []*v1.Pod) {
logging.FromContext(ctx).Infof("Waiting for unschedulable pods")
// Start the batching window after the first pod is received
pods = append(pods, <-p.pods)
timeout := time.NewTimer(MaxBatchDuration)
idle := time.NewTimer(MinBatchDuration)
start := time.Now()
defer func() {
pods = p.FilterProvisionable(ctx, pods)
logging.FromContext(ctx).Infof("Batched %d pods in %s", len(pods), time.Since(start))
}()
for {
Expand All @@ -170,36 +162,47 @@ func (p *Provisioner) Batch(ctx context.Context) (pods []*v1.Pod) {
}
}

// FilterProvisionable removes pods that have been assigned a node.
// filter removes pods that have been assigned a node.
// This check is needed to prevent duplicate binds when a pod is scheduled to a node
// between the time it was ingested into the scheduler and the time it is included
// in a provisioner batch.
func (p *Provisioner) FilterProvisionable(ctx context.Context, pods []*v1.Pod) []*v1.Pod {
func (p *Provisioner) filter(ctx context.Context, pods []*v1.Pod) ([]*v1.Pod, error) {
provisionable := []*v1.Pod{}
for _, pod := range pods {
// the original pod should be returned rather than the newly fetched pod in case the scheduler relaxed constraints
original := pod
candidate := &v1.Pod{}
if err := p.kubeClient.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, candidate); err != nil {
logging.FromContext(ctx).Errorf("Could not verify pod \"%s/%s\" is provisionable, %s", pod.Namespace, pod.Name, err.Error())
continue
// Do not mutate the pod in case the scheduler relaxed constraints
stored := &v1.Pod{}
if err := p.kubeClient.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, stored); err != nil {
if errors.IsNotFound(err) {
continue
}
return nil, err
}
if candidate.Spec.NodeName == "" {
provisionable = append(provisionable, original)
if stored.Spec.NodeName == "" {
provisionable = append(provisionable, pod)
}
}
return provisionable
return provisionable, nil
}

func (p *Provisioner) launch(ctx context.Context, constraints *v1alpha5.Constraints, packing *binpacking.Packing) error {
if err := p.verifyResourceLimits(ctx, p.Provisioner); err != nil {
return fmt.Errorf("limits exceeded, %w", err)
// Check limits
latest := &v1alpha5.Provisioner{}
if err := p.kubeClient.Get(ctx, client.ObjectKeyFromObject(p.Provisioner), latest); err != nil {
return fmt.Errorf("getting current resource usage, %w", err)
}
if err := p.Spec.Limits.ExceededBy(latest.Status.Resources); err != nil {
return err
}
// Create and Bind
pods := make(chan []*v1.Pod, len(packing.Pods))
defer close(pods)
for _, ps := range packing.Pods {
pods <- ps
}
packedPods := queueFor(packing.Pods)
return <-p.cloudProvider.Create(ctx, constraints, packing.InstanceTypeOptions, packing.NodeQuantity, func(node *v1.Node) error {
node.Labels = functional.UnionStringMaps(node.Labels, constraints.Labels)
node.Spec.Taints = append(node.Spec.Taints, constraints.Taints...)
return p.bind(ctx, node, <-packedPods)
return p.bind(ctx, node, <-pods)
})
}

Expand Down Expand Up @@ -246,24 +249,6 @@ func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (
return nil
}

func (p *Provisioner) verifyResourceLimits(ctx context.Context, provisioner *v1alpha5.Provisioner) error {
latest := &v1alpha5.Provisioner{}
if err := p.kubeClient.Get(ctx, client.ObjectKeyFromObject(provisioner), latest); err != nil {
return fmt.Errorf("getting current resource usage, %w", err)
}
return provisioner.Spec.Limits.ExceededBy(latest.Status.Resources)
}

// Thread safe channel to pop off packed pod slices
func queueFor(pods [][]*v1.Pod) <-chan []*v1.Pod {
queue := make(chan []*v1.Pod, len(pods))
defer close(queue)
for _, ps := range pods {
queue <- ps
}
return queue
}

var bindTimeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Expand Down
20 changes: 9 additions & 11 deletions pkg/controllers/scheduling/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,22 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
return reconcile.Result{}, nil
}
// Select a provisioner, wait for it to bind the pod, and verify scheduling succeeded in the next loop
provisioner, err := c.selectProvisioner(ctx, pod)
if err != nil {
if err := c.selectProvisioner(ctx, pod); err != nil {
logging.FromContext(ctx).Debugf("Could not schedule pod, %s", err.Error())
return reconcile.Result{}, err
}
provisioner.Add(ctx, pod)
return reconcile.Result{RequeueAfter: time.Second * 1}, nil
}

func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (*provisioning.Provisioner, error) {
func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (errs error) {
// Relax preferences if pod has previously failed to schedule.
c.preferences.Relax(ctx, pod)
// Pick provisioner
var provisioner *provisioning.Provisioner
var errs error
provisioners := c.provisioners.List(ctx)
if len(provisioners) == 0 {
return nil
}
for _, candidate := range c.provisioners.List(ctx) {
if err := candidate.Spec.DeepCopy().ValidatePod(pod); err != nil {
errs = multierr.Append(errs, fmt.Errorf("tried provisioner/%s: %w", candidate.Name, err))
Expand All @@ -94,13 +95,10 @@ func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (*provi
}
}
if provisioner == nil {
err := fmt.Errorf("matched 0/%d provisioners", len(multierr.Errors(errs)))
if errs != nil {
err = fmt.Errorf("%s, %w", err, errs)
}
return nil, err
return fmt.Errorf("matched 0/%d provisioners, %w", len(multierr.Errors(errs)), errs)
}
return provisioner, nil
provisioner.Add(ctx, pod)
return nil
}

func isProvisionable(p *v1.Pod) bool {
Expand Down

0 comments on commit 10c13f1

Please sign in to comment.