Skip to content

Commit

Permalink
Implemented support for multiple provisioners
Browse files Browse the repository at this point in the history
  • Loading branch information
ellistarn committed Nov 17, 2021
1 parent e7736b6 commit 0145a76
Show file tree
Hide file tree
Showing 11 changed files with 439 additions and 22 deletions.
10 changes: 8 additions & 2 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ package main
import (
"context"
"fmt"
"sync"

"github.com/awslabs/karpenter/pkg/apis"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/cloudprovider/registry"
"github.com/awslabs/karpenter/pkg/controllers"
"github.com/awslabs/karpenter/pkg/controllers/allocation"
"github.com/awslabs/karpenter/pkg/controllers/metrics"
"github.com/awslabs/karpenter/pkg/controllers/node"
"github.com/awslabs/karpenter/pkg/controllers/provisioning"
"github.com/awslabs/karpenter/pkg/controllers/scheduling"
"github.com/awslabs/karpenter/pkg/controllers/termination"
"github.com/awslabs/karpenter/pkg/utils/options"
"github.com/awslabs/karpenter/pkg/utils/restconfig"
Expand Down Expand Up @@ -81,9 +83,13 @@ func main() {
HealthProbeBindAddress: fmt.Sprintf(":%d", opts.HealthProbePort),
})

provisioners := &sync.Map{}

if err := manager.RegisterControllers(ctx,
allocation.NewController(manager.GetClient(), clientSet.CoreV1(), cloudProvider),
// allocation.NewController(manager.GetClient(), clientSet.CoreV1(), cloudProvider),
termination.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider),
provisioning.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider, provisioners),
scheduling.NewController(manager.GetClient(), provisioners),
node.NewController(manager.GetClient()),
metrics.NewController(manager.GetClient(), cloudProvider),
).Start(ctx); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/provisioning/v1alpha5/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ func (c *Constraints) Supports(pod *v1.Pod) error {
podRequirements := PodRequirements(pod)
for _, key := range podRequirements.Keys() {
if c.Requirements.Requirement(key).Len() == 0 {
return fmt.Errorf("%s is too constrained", key)
return fmt.Errorf("invalid constraint %q, %v not in %v", key, podRequirements.Requirement(key).UnsortedList(), c.Requirements.Requirement(key).UnsortedList())
}
}
// The combined requirements are not compatible
combined := c.Requirements.With(podRequirements)
for _, key := range podRequirements.Keys() {
if combined.Requirement(key).Len() == 0 {
return fmt.Errorf("%s is too constrained", key)
return fmt.Errorf("invalid constraint %q, %v not in %v", key, podRequirements.Requirement(key).UnsortedList(), c.Requirements.Requirement(key).UnsortedList())
}
}
return nil
Expand Down
1 change: 0 additions & 1 deletion pkg/apis/provisioning/v1alpha5/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions pkg/controllers/allocation/binpacking/packer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,13 @@ func (p *Packer) Pack(ctx context.Context, schedule *scheduling.Schedule, instan
if mainPack, ok := packs[key]; ok {
mainPack.NodeQuantity++
mainPack.Pods = append(mainPack.Pods, packing.Pods...)
logging.FromContext(ctx).Debugf("Incremented node count to %d on packing for %d pod(s) with instance type option(s) %v", mainPack.NodeQuantity, flattenedLen(packing.Pods...), instanceTypeNames(mainPack.InstanceTypeOptions))
continue
} else {
packs[key] = packing
}
}
packings = append(packings, packing)
logging.FromContext(ctx).Infof("Computed packing for %d pod(s) with instance type option(s) %s", flattenedLen(packing.Pods...), instanceTypeNames(packing.InstanceTypeOptions))
logging.FromContext(ctx).Infof("Computed packing of %d nodes for %d pod(s) with instance type option(s) %s", packing.NodeQuantity, flattenedLen(packing.Pods...), instanceTypeNames(packing.InstanceTypeOptions))
}
return packings
}
Expand Down
13 changes: 3 additions & 10 deletions pkg/controllers/allocation/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/metrics"
"github.com/awslabs/karpenter/pkg/utils/functional"
"github.com/mitchellh/hashstructure/v2"
"github.com/prometheus/client_golang/prometheus"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -74,30 +73,24 @@ func NewScheduler(kubeClient client.Client, cloudProvider cloudprovider.CloudPro
func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha5.Provisioner, instanceTypes []cloudprovider.InstanceType, pods []*v1.Pod) (schedules []*Schedule, err error) {
defer metrics.Measure(schedulingDuration.WithLabelValues(provisioner.Name))()

constraints := provisioner.Spec.Constraints.DeepCopy()
constraints.Labels = functional.UnionStringMaps(constraints.Labels, map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name})
constraints.Requirements = provisioner.Spec.Requirements.
With(globalRequirements(instanceTypes)).
With(v1alpha5.LabelRequirements(constraints.Labels))

// Relax preferences if pods have previously failed to schedule.
s.Preferences.Relax(ctx, pods)
// Inject temporarily adds specific NodeSelectors to pods, which are then
// used by scheduling logic. This isn't strictly necessary, but is a useful
// trick to avoid passing topology decisions through the scheduling code. It
// lets us to treat TopologySpreadConstraints as just-in-time NodeSelectors.
if err := s.Topology.Inject(ctx, constraints, pods); err != nil {
if err := s.Topology.Inject(ctx, &provisioner.Spec.Constraints, pods); err != nil {
return nil, fmt.Errorf("injecting topology, %w", err)
}
// Separate pods into schedules of isomorphic scheduling constraints.
schedules, err = s.getSchedules(ctx, constraints, pods)
schedules, err = s.getSchedules(ctx, &provisioner.Spec.Constraints, pods)
if err != nil {
return nil, fmt.Errorf("getting schedules, %w", err)
}
return schedules, nil
}

func globalRequirements(instanceTypes []cloudprovider.InstanceType) (requirements v1alpha5.Requirements) {
func GlobalRequirements(instanceTypes []cloudprovider.InstanceType) (requirements v1alpha5.Requirements) {
supported := map[string]sets.String{
v1.LabelInstanceTypeStable: sets.NewString(),
v1.LabelTopologyZone: sets.NewString(),
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/node/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Controller struct {

// Reconcile executes a reallocation control loop for the resource
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("Node"))
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("node"))
// 1. Retrieve Node, ignore if not provisioned or terminating
stored := &v1.Node{}
if err := c.kubeClient.Get(ctx, req.NamespacedName, stored); err != nil {
Expand Down Expand Up @@ -115,7 +115,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
func (c *Controller) Register(ctx context.Context, m manager.Manager) error {
return controllerruntime.
NewControllerManagedBy(m).
Named("Node").
Named("node").
For(&v1.Node{}).
Watches(
// Reconcile all nodes related to a provisioner when it changes.
Expand Down
123 changes: 123 additions & 0 deletions pkg/controllers/provisioning/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package provisioning

import (
"context"
"sync"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"knative.dev/pkg/logging"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/controllers/allocation"
"github.com/awslabs/karpenter/pkg/controllers/allocation/binpacking"
"github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling"
"github.com/awslabs/karpenter/pkg/utils/functional"
)

// Controller for the resource
type Controller struct {
// TODO docs
ctx context.Context
provisioners *sync.Map
scheduler *scheduling.Scheduler
launcher *allocation.Launcher
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
}

// NewController is a constructor
func NewController(ctx context.Context, kubeClient client.Client, coreV1Client corev1.CoreV1Interface, cloudProvider cloudprovider.CloudProvider, provisioners *sync.Map) *Controller {
return &Controller{
ctx: ctx,
provisioners: provisioners,
kubeClient: kubeClient,
cloudProvider: cloudProvider,
scheduler: scheduling.NewScheduler(kubeClient, cloudProvider),
launcher: &allocation.Launcher{KubeClient: kubeClient, CoreV1Client: coreV1Client, CloudProvider: cloudProvider, Packer: &binpacking.Packer{}},
}
}

// Reconcile a control loop for the resource
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("provisioning").With("provisioner", req.Name))
provisioner := &v1alpha5.Provisioner{}
if err := c.kubeClient.Get(ctx, req.NamespacedName, provisioner); err != nil {
if errors.IsNotFound(err) {
c.DeleteProvisioner(ctx, req)
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
instanceTypes, err := c.cloudProvider.GetInstanceTypes(ctx, &provisioner.Spec.Constraints)
if err != nil {
return reconcile.Result{}, err
}
provisioner.Spec.Labels = functional.UnionStringMaps(
provisioner.Spec.Labels,
map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name},
)
provisioner.Spec.Requirements = provisioner.Spec.Requirements.
With(scheduling.GlobalRequirements(instanceTypes)). // TODO(etarn) move GlobalRequirements to this file
With(v1alpha5.LabelRequirements(provisioner.Spec.Labels))

// Stop the existing provisioner if exists. This will drain the current
// workflow and replace it with an updated provisioner configuration.
// Requeue in order to discover any changes from GetInstanceTypes.
c.DeleteProvisioner(ctx, req)
c.CreateProvisioner(ctx, provisioner, instanceTypes)
return reconcile.Result{RequeueAfter: 5 * time.Minute}, nil
}

// DeleteProvisioner stops and removes a provisioner
func (c *Controller) DeleteProvisioner(ctx context.Context, req reconcile.Request) {
if p, ok := c.provisioners.LoadAndDelete(req.String()); ok {
p.(*Provisioner).stop(ctx)
}
}

func (c *Controller) CreateProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, instanceTypes []cloudprovider.InstanceType) *Provisioner {
ctx, cancelFunc := context.WithCancel(ctx)
p := &Provisioner{
Provisioner: provisioner,
instanceTypes: instanceTypes,
pods: make(chan *v1.Pod),
results: make(chan error),
scheduler: c.scheduler,
launcher: c.launcher,
cancelFunc: cancelFunc,
}
c.provisioners.Store(provisioner.Name, p)
go p.start(ctx)
return p
}

func (c *Controller) Register(ctx context.Context, m manager.Manager) error {
return controllerruntime.
NewControllerManagedBy(m).
Named("provisioning").
For(&v1alpha5.Provisioner{}).
WithOptions(controller.Options{MaxConcurrentReconciles: 10}).
Complete(c)
}
120 changes: 120 additions & 0 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provisioning

import (
"context"
"fmt"
"time"

"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/controllers/allocation"
"github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling"
v1 "k8s.io/api/core/v1"
"knative.dev/pkg/logging"
)

var (
MaxBatchWindow = time.Second * 10
MinBatchWindow = time.Second * 1
)

// Provisioner is a stateful, threadsafe controller. Pods are enqueud and
// batched, capacity is launched, and pods are bound to the new capacity.
type Provisioner struct {
// State
*v1alpha5.Provisioner
instanceTypes []cloudprovider.InstanceType
pods chan *v1.Pod
results chan error
cancelFunc context.CancelFunc

// Dependencies
scheduler *scheduling.Scheduler
launcher *allocation.Launcher
}

// Start the provisioner's loop
func (p *Provisioner) start(ctx context.Context) {
logging.FromContext(ctx).Info("Starting provisioner")
for {
select {
case <-ctx.Done():
return
default:
if err := p.provision(ctx); err != nil {
logging.FromContext(ctx).Errorf("Provisioning failed, %s", err.Error())
}
}
}
}

func (p *Provisioner) stop(ctx context.Context) {
logging.FromContext(ctx).Info("Stopping provisioner")
p.cancelFunc()
close(p.pods)
close(p.results)
}

func (p *Provisioner) provision(ctx context.Context) (err error) {
// Wait for a batch of pods
pods := p.Batch(ctx)
// Send results
defer func() {
for i := 0; i < len(pods); i++ {
p.results <- err
}
}()
// Separate pods by scheduling constraints
schedules, err := p.scheduler.Solve(ctx, p.Provisioner, p.instanceTypes, pods)
if err != nil {
return fmt.Errorf("solving scheduling constraints, %w", err)
}
// Launch capacity and bind pods
if err := p.launcher.Launch(ctx, schedules, p.instanceTypes); err != nil {
return fmt.Errorf("launching capacity, %w", err)
}
return nil
}

func (p *Provisioner) Enqueue(ctx context.Context, pod *v1.Pod) error {
p.pods <- pod
return <-p.results
}

func (p *Provisioner) Batch(ctx context.Context) (pods []*v1.Pod) {
logging.FromContext(ctx).Infof("Waiting for unschedulable pods")
pods = append(pods, <-p.pods)
timeout := time.NewTimer(MaxBatchWindow)
idle := time.NewTimer(MinBatchWindow)
start := time.Now()
defer func() {
logging.FromContext(ctx).Infof("Batched %d pods in %s", len(pods), time.Since(start))
}()
for {
select {
case <-ctx.Done():
return pods
case <-timeout.C:
return pods
case <-idle.C:
return pods
case pod := <-p.pods:
idle.Reset(MinBatchWindow)
pods = append(pods, pod)
}
}
}
Loading

0 comments on commit 0145a76

Please sign in to comment.