Skip to content

Commit

Permalink
Minor refactor to move the scheduler to its own controller package (#839
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ellistarn authored Nov 23, 2021
1 parent 71ce076 commit 6d2ec9b
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 71 deletions.
11 changes: 5 additions & 6 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/awslabs/karpenter/pkg/controllers/metrics"
"github.com/awslabs/karpenter/pkg/controllers/node"
"github.com/awslabs/karpenter/pkg/controllers/provisioning"
"github.com/awslabs/karpenter/pkg/controllers/scheduling"
"github.com/awslabs/karpenter/pkg/controllers/termination"
"github.com/awslabs/karpenter/pkg/utils/injection"
"github.com/awslabs/karpenter/pkg/utils/options"
Expand Down Expand Up @@ -81,14 +82,12 @@ func main() {
HealthProbeBindAddress: fmt.Sprintf(":%d", opts.HealthProbePort),
})

terminator := termination.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider)
provisioner := provisioning.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider)
scheduler := provisioning.NewScheduler(manager.GetClient(), provisioner)
provisioners := provisioning.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider)

if err := manager.RegisterControllers(ctx,
provisioner,
scheduler,
terminator,
provisioners,
scheduling.NewController(manager.GetClient(), provisioners),
termination.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider),
node.NewController(manager.GetClient()),
metrics.NewController(manager.GetClient(), cloudProvider),
).Start(ctx); err != nil {
Expand Down
29 changes: 15 additions & 14 deletions pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/awslabs/karpenter/pkg/cloudprovider/aws/fake"
"github.com/awslabs/karpenter/pkg/cloudprovider/registry"
"github.com/awslabs/karpenter/pkg/controllers/provisioning"
"github.com/awslabs/karpenter/pkg/controllers/scheduling"
"github.com/awslabs/karpenter/pkg/test"
. "github.com/awslabs/karpenter/pkg/test/expectations"
"github.com/awslabs/karpenter/pkg/utils/injection"
Expand All @@ -49,8 +50,8 @@ var ctx context.Context
var env *test.Environment
var launchTemplateCache *cache.Cache
var fakeEC2API *fake.EC2API
var controller *provisioning.Controller
var scheduler *provisioning.Scheduler
var provisioners *provisioning.Controller
var scheduler *scheduling.Controller

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
Expand Down Expand Up @@ -80,8 +81,8 @@ var _ = BeforeSuite(func() {
creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst),
}
registry.RegisterOrDie(ctx, cloudProvider)
controller = provisioning.NewController(ctx, e.Client, clientSet.CoreV1(), cloudProvider)
scheduler = provisioning.NewScheduler(e.Client, controller)
provisioners = provisioning.NewController(ctx, e.Client, clientSet.CoreV1(), cloudProvider)
scheduler = scheduling.NewController(e.Client, provisioners)
})

Expect(env.Start()).To(Succeed(), "Failed to start environment")
Expand Down Expand Up @@ -109,7 +110,7 @@ var _ = Describe("Allocation", func() {
Context("Reconciliation", func() {
Context("Specialized Hardware", func() {
It("should launch instances for Nvidia GPU resource requests", func() {
for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner,
for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")},
Expand Down Expand Up @@ -142,7 +143,7 @@ var _ = Describe("Allocation", func() {
}
})
It("should launch instances for AWS Neuron resource requests", func() {
for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner,
for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")},
Expand Down Expand Up @@ -178,7 +179,7 @@ var _ = Describe("Allocation", func() {
})
Context("CapacityType", func() {
It("should default to on demand", func() {
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod())[0]
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod())[0]
ExpectScheduled(ctx, env.Client, pod)
Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1))
input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput)
Expand All @@ -187,7 +188,7 @@ var _ = Describe("Allocation", func() {
})
It("should launch spot capacity if flexible to both spot and on demand", func() {
provisioner.Spec.Requirements = v1alpha5.Requirements{{Key: v1alpha5.LabelCapacityType, Operator: v1.NodeSelectorOpIn, Values: []string{v1alpha1.CapacityTypeSpot, v1alpha1.CapacityTypeOnDemand}}}
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner,
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1alpha5.LabelCapacityType: v1alpha1.CapacityTypeSpot}}),
)[0]
ExpectScheduled(ctx, env.Client, pod)
Expand Down Expand Up @@ -218,7 +219,7 @@ var _ = Describe("Allocation", func() {
Effect: "NoSchedule",
}

pod1 := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner,
pod1 := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{
Tolerations: []v1.Toleration{t1, t2, t3},
}),
Expand All @@ -227,7 +228,7 @@ var _ = Describe("Allocation", func() {
Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1))
name1 := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput).LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateName

pod2 := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner,
pod2 := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{
Tolerations: []v1.Toleration{t2, t3, t1},
}),
Expand All @@ -240,7 +241,7 @@ var _ = Describe("Allocation", func() {
})

It("should default to a generated launch template", func() {
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod())[0]
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod())[0]
ExpectScheduled(ctx, env.Client, pod)
Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1))
input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput)
Expand All @@ -250,7 +251,7 @@ var _ = Describe("Allocation", func() {
})
It("should allow a launch template to be specified", func() {
provider.LaunchTemplate = aws.String("test-launch-template")
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod())[0]
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod())[0]
ExpectScheduled(ctx, env.Client, pod)
Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1))
input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput)
Expand All @@ -262,7 +263,7 @@ var _ = Describe("Allocation", func() {
})
Context("Subnets", func() {
It("should default to the cluster's subnets", func() {
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod())[0]
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod())[0]
ExpectScheduled(ctx, env.Client, pod)
Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1))
input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput)
Expand All @@ -276,7 +277,7 @@ var _ = Describe("Allocation", func() {
})
Context("Security Groups", func() {
It("should default to the clusters security groups", func() {
pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod())[0]
pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod())[0]
ExpectScheduled(ctx, env.Client, pod)
Expect(fakeEC2API.CalledWithCreateLaunchTemplateInput.Cardinality()).To(Equal(1))
input := fakeEC2API.CalledWithCreateLaunchTemplateInput.Pop().(*ec2.CreateLaunchTemplateInput)
Expand Down
24 changes: 23 additions & 1 deletion pkg/controllers/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/sets"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"knative.dev/pkg/logging"
controllerruntime "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -94,7 +95,7 @@ func (c *Controller) Apply(ctx context.Context, provisioner *v1alpha5.Provisione
}
provisioner.Spec.Labels = functional.UnionStringMaps(provisioner.Spec.Labels, map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name})
provisioner.Spec.Requirements = provisioner.Spec.Requirements.
With(scheduling.GlobalRequirements(instanceTypes)). // TODO(etarn) move GlobalRequirements to this file
With(requirements(instanceTypes)).
With(v1alpha5.LabelRequirements(provisioner.Spec.Labels))
ctx, cancelFunc := context.WithCancel(ctx)
p := &Provisioner{
Expand Down Expand Up @@ -126,6 +127,27 @@ func (c *Controller) List(ctx context.Context) []*Provisioner {
return provisioners
}

func requirements(instanceTypes []cloudprovider.InstanceType) (requirements v1alpha5.Requirements) {
supported := map[string]sets.String{
v1.LabelInstanceTypeStable: sets.NewString(),
v1.LabelTopologyZone: sets.NewString(),
v1.LabelArchStable: sets.NewString(),
v1alpha5.LabelCapacityType: sets.NewString(),
}
for _, instanceType := range instanceTypes {
for _, offering := range instanceType.Offerings() {
supported[v1.LabelTopologyZone].Insert(offering.Zone)
supported[v1alpha5.LabelCapacityType].Insert(offering.CapacityType)
}
supported[v1.LabelInstanceTypeStable].Insert(instanceType.Name())
supported[v1.LabelArchStable].Insert(instanceType.Architecture())
}
for key, values := range supported {
requirements = append(requirements, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: values.UnsortedList()})
}
return requirements
}

// Register the controller to the manager
func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.
Expand Down
24 changes: 1 addition & 23 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
Expand Down Expand Up @@ -87,27 +86,6 @@ func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha5.Provisioner
return schedules, nil
}

func GlobalRequirements(instanceTypes []cloudprovider.InstanceType) (requirements v1alpha5.Requirements) {
supported := map[string]sets.String{
v1.LabelInstanceTypeStable: sets.NewString(),
v1.LabelTopologyZone: sets.NewString(),
v1.LabelArchStable: sets.NewString(),
v1alpha5.LabelCapacityType: sets.NewString(),
}
for _, instanceType := range instanceTypes {
for _, offering := range instanceType.Offerings() {
supported[v1.LabelTopologyZone].Insert(offering.Zone)
supported[v1alpha5.LabelCapacityType].Insert(offering.CapacityType)
}
supported[v1.LabelInstanceTypeStable].Insert(instanceType.Name())
supported[v1.LabelArchStable].Insert(instanceType.Architecture())
}
for key, values := range supported {
requirements = append(requirements, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: values.UnsortedList()})
}
return requirements
}

// getSchedules separates pods into a set of schedules. All pods in each group
// contain isomorphic scheduling constraints and can be deployed together on the
// same node, or multiple similar nodes if the pods exceed one node's capacity.
Expand Down Expand Up @@ -153,7 +131,7 @@ func (s *Scheduler) getDaemons(ctx context.Context, constraints *v1alpha5.Constr
if err := s.KubeClient.List(ctx, daemonSetList); err != nil {
return nil, fmt.Errorf("listing daemonsets, %w", err)
}
// Include daemonsets that will schedule on this node
// Include DaemonSets that will schedule on this node
pods := []*v1.Pod{}
for _, daemonSet := range daemonSetList.Items {
pod := &v1.Pod{Spec: daemonSet.Spec.Template.Spec}
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/provisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/awslabs/karpenter/pkg/cloudprovider/fake"
"github.com/awslabs/karpenter/pkg/cloudprovider/registry"
"github.com/awslabs/karpenter/pkg/controllers/provisioning"
"github.com/awslabs/karpenter/pkg/controllers/scheduling"
"github.com/awslabs/karpenter/pkg/test"
"github.com/awslabs/karpenter/pkg/utils/resources"

Expand All @@ -38,7 +39,7 @@ import (

var ctx context.Context
var controller *provisioning.Controller
var scheduler *provisioning.Scheduler
var scheduler *scheduling.Controller
var env *test.Environment

func TestAPIs(t *testing.T) {
Expand All @@ -52,7 +53,7 @@ var _ = BeforeSuite(func() {
cloudProvider := &fake.CloudProvider{}
registry.RegisterOrDie(ctx, cloudProvider)
controller = provisioning.NewController(ctx, e.Client, corev1.NewForConfigOrDie(e.Config), cloudProvider)
scheduler = provisioning.NewScheduler(e.Client, controller)
scheduler = scheduling.NewController(e.Client, controller)
})
Expect(env.Start()).To(Succeed(), "Failed to start environment")
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package provisioning
package scheduling

import (
"context"
"fmt"

"github.com/awslabs/karpenter/pkg/controllers/provisioning/scheduling"
"github.com/awslabs/karpenter/pkg/controllers/provisioning"
"github.com/awslabs/karpenter/pkg/utils/pod"
"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
Expand All @@ -33,26 +33,26 @@ import (
)

// Controller for the resource
type Scheduler struct {
kubeClient client.Client
controller *Controller
preferences *scheduling.Preferences
type Controller struct {
kubeClient client.Client
provisioners *provisioning.Controller
preferences *Preferences
}

// NewScheduler constructs a controller instance
func NewScheduler(kubeClient client.Client, controller *Controller) *Scheduler {
return &Scheduler{
kubeClient: kubeClient,
controller: controller,
preferences: scheduling.NewPreferences(),
// NewController constructs a controller instance
func NewController(kubeClient client.Client, provisioners *provisioning.Controller) *Controller {
return &Controller{
kubeClient: kubeClient,
provisioners: provisioners,
preferences: NewPreferences(),
}
}

// Reconcile the resource
func (s *Scheduler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("scheduling").With("pod", req.String()))
pod := &v1.Pod{}
if err := s.kubeClient.Get(ctx, req.NamespacedName, pod); err != nil {
if err := c.kubeClient.Get(ctx, req.NamespacedName, pod); err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
Expand All @@ -67,19 +67,19 @@ func (s *Scheduler) Reconcile(ctx context.Context, req reconcile.Request) (recon
return reconcile.Result{}, nil
}
// Schedule and requeue. If successful, will terminate in the unschedulable check above
if err := s.Schedule(ctx, pod); err != nil {
if err := c.Schedule(ctx, pod); err != nil {
logging.FromContext(ctx).Errorf("Failed to schedule, %s", err.Error())
}
return reconcile.Result{Requeue: true}, nil
}

func (s *Scheduler) Schedule(ctx context.Context, pod *v1.Pod) error {
func (c *Controller) Schedule(ctx context.Context, pod *v1.Pod) error {
// Relax preferences if pod has previously failed to schedule.
s.preferences.Relax(ctx, pod)
c.preferences.Relax(ctx, pod)
// Pick provisioner
var provisioner *Provisioner
var provisioner *provisioning.Provisioner
var errs error
for _, candidate := range s.controller.List(ctx) {
for _, candidate := range c.provisioners.List(ctx) {
if err := candidate.Spec.DeepCopy().ValidatePod(pod); err != nil {
errs = multierr.Append(errs, fmt.Errorf("tried provisioner/%s: %w", candidate.Name, err))
} else {
Expand Down Expand Up @@ -171,11 +171,11 @@ func validateNodeSelectorTerm(term v1.NodeSelectorTerm) (errs error) {
return errs
}

func (s *Scheduler) Register(_ context.Context, m manager.Manager) error {
func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.
NewControllerManagedBy(m).
Named("scheduling").
For(&v1.Pod{}).
WithOptions(controller.Options{MaxConcurrentReconciles: 10_000}).
Complete(s)
Complete(c)
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/awslabs/karpenter/pkg/cloudprovider/fake"
"github.com/awslabs/karpenter/pkg/cloudprovider/registry"
"github.com/awslabs/karpenter/pkg/controllers/provisioning"
"github.com/awslabs/karpenter/pkg/controllers/scheduling"
"github.com/awslabs/karpenter/pkg/test"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -38,7 +39,7 @@ import (
var ctx context.Context
var provisioner *v1alpha5.Provisioner
var controller *provisioning.Controller
var scheduler *provisioning.Scheduler
var scheduler *scheduling.Controller
var env *test.Environment

func TestAPIs(t *testing.T) {
Expand All @@ -52,7 +53,7 @@ var _ = BeforeSuite(func() {
cloudProvider := &fake.CloudProvider{}
registry.RegisterOrDie(ctx, cloudProvider)
controller = provisioning.NewController(ctx, e.Client, corev1.NewForConfigOrDie(e.Config), cloudProvider)
scheduler = provisioning.NewScheduler(e.Client, controller)
scheduler = scheduling.NewController(e.Client, controller)
})
Expect(env.Start()).To(Succeed(), "Failed to start environment")
})
Expand Down
Loading

0 comments on commit 6d2ec9b

Please sign in to comment.