diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 005e01db72b4..3b6551e560e1 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -22,9 +22,9 @@ import ( "github.com/awslabs/karpenter/pkg/cloudprovider" "github.com/awslabs/karpenter/pkg/cloudprovider/registry" "github.com/awslabs/karpenter/pkg/controllers" - "github.com/awslabs/karpenter/pkg/controllers/allocation" "github.com/awslabs/karpenter/pkg/controllers/metrics" "github.com/awslabs/karpenter/pkg/controllers/node" + "github.com/awslabs/karpenter/pkg/controllers/provisioning" "github.com/awslabs/karpenter/pkg/controllers/termination" "github.com/awslabs/karpenter/pkg/utils/options" "github.com/awslabs/karpenter/pkg/utils/restconfig" @@ -81,9 +81,14 @@ func main() { HealthProbeBindAddress: fmt.Sprintf(":%d", opts.HealthProbePort), }) + terminator := termination.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider) + provisioner := provisioning.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider) + scheduler := provisioning.NewScheduler(manager.GetClient(), provisioner) + if err := manager.RegisterControllers(ctx, - allocation.NewController(manager.GetClient(), clientSet.CoreV1(), cloudProvider), - termination.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider), + provisioner, + scheduler, + terminator, node.NewController(manager.GetClient()), metrics.NewController(manager.GetClient(), cloudProvider), ).Start(ctx); err != nil { diff --git a/pkg/apis/provisioning/v1alpha5/constraints.go b/pkg/apis/provisioning/v1alpha5/constraints.go index 4a0b38ea19bb..750a123819dd 100644 --- a/pkg/apis/provisioning/v1alpha5/constraints.go +++ b/pkg/apis/provisioning/v1alpha5/constraints.go @@ -49,14 +49,14 @@ func (c *Constraints) Supports(pod *v1.Pod) error { podRequirements := PodRequirements(pod) for _, key := range podRequirements.Keys() { if c.Requirements.Requirement(key).Len() == 0 { - return fmt.Errorf("%s is too constrained", key) + return fmt.Errorf("invalid constraint %q, %v not in %v", key, podRequirements.Requirement(key).UnsortedList(), c.Requirements.Requirement(key).UnsortedList()) } } // The combined requirements are not compatible combined := c.Requirements.With(podRequirements) for _, key := range podRequirements.Keys() { if combined.Requirement(key).Len() == 0 { - return fmt.Errorf("%s is too constrained", key) + return fmt.Errorf("invalid constraint %q, %v not in %v", key, podRequirements.Requirement(key).UnsortedList(), c.Requirements.Requirement(key).UnsortedList()) } } return nil diff --git a/pkg/apis/provisioning/v1alpha5/zz_generated.deepcopy.go b/pkg/apis/provisioning/v1alpha5/zz_generated.deepcopy.go index 59a6407a056f..d1cb746bcf25 100644 --- a/pkg/apis/provisioning/v1alpha5/zz_generated.deepcopy.go +++ b/pkg/apis/provisioning/v1alpha5/zz_generated.deepcopy.go @@ -1,4 +1,3 @@ -//go:build !ignore_autogenerated // +build !ignore_autogenerated /* diff --git a/pkg/cloudprovider/aws/apis/v1alpha1/zz_generated.deepcopy.go b/pkg/cloudprovider/aws/apis/v1alpha1/zz_generated.deepcopy.go index 7fc53015f60a..83de00b8cc47 100644 --- a/pkg/cloudprovider/aws/apis/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/cloudprovider/aws/apis/v1alpha1/zz_generated.deepcopy.go @@ -1,4 +1,3 @@ -//go:build !ignore_autogenerated // +build !ignore_autogenerated /* diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index 7e1a06188622..650879a5ecab 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -18,24 +18,19 @@ import ( "context" "encoding/json" "testing" - "time" "github.com/Pallinder/go-randomdata" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/awslabs/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" "github.com/awslabs/karpenter/pkg/cloudprovider/aws/fake" "github.com/awslabs/karpenter/pkg/cloudprovider/registry" - "github.com/awslabs/karpenter/pkg/controllers/allocation" - "github.com/awslabs/karpenter/pkg/controllers/allocation/binpacking" - "github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling" + "github.com/awslabs/karpenter/pkg/controllers/provisioning" "github.com/awslabs/karpenter/pkg/test" . "github.com/awslabs/karpenter/pkg/test/expectations" "github.com/awslabs/karpenter/pkg/utils/options" "github.com/awslabs/karpenter/pkg/utils/parallel" "github.com/awslabs/karpenter/pkg/utils/resources" "github.com/patrickmn/go-cache" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" @@ -53,8 +48,8 @@ var ctx context.Context var env *test.Environment var launchTemplateCache *cache.Cache var fakeEC2API *fake.EC2API -var controller reconcile.Reconciler -var opts options.Options +var controller *provisioning.Controller +var scheduler *provisioning.Scheduler func TestAPIs(t *testing.T) { ctx = TestContextWithLogger(t) @@ -63,16 +58,12 @@ func TestAPIs(t *testing.T) { } var _ = BeforeSuite(func() { - opts = options.Options{ - ClusterName: "test-cluster", - ClusterEndpoint: "https://test-cluster", - } - ctx = options.Inject(ctx, opts) - launchTemplateCache = cache.New(CacheTTL, CacheCleanupInterval) - fakeEC2API = &fake.EC2API{} - subnetProvider := NewSubnetProvider(fakeEC2API) - instanceTypeProvider := NewInstanceTypeProvider(fakeEC2API, subnetProvider) env = test.NewEnvironment(ctx, func(e *test.Environment) { + ctx = options.Inject(ctx, options.Options{ClusterName: "test-cluster", ClusterEndpoint: "https://test-cluster"}) + launchTemplateCache = cache.New(CacheTTL, CacheCleanupInterval) + fakeEC2API = &fake.EC2API{} + subnetProvider := NewSubnetProvider(fakeEC2API) + instanceTypeProvider := NewInstanceTypeProvider(fakeEC2API, subnetProvider) clientSet := kubernetes.NewForConfigOrDie(e.Config) cloudProvider := &CloudProvider{ subnetProvider: subnetProvider, @@ -88,19 +79,8 @@ var _ = BeforeSuite(func() { creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst), } registry.RegisterOrDie(ctx, cloudProvider) - controller = &allocation.Controller{ - Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond), - Filter: &allocation.Filter{KubeClient: e.Client}, - Scheduler: scheduling.NewScheduler(e.Client, cloudProvider), - Launcher: &allocation.Launcher{ - Packer: &binpacking.Packer{}, - KubeClient: e.Client, - CoreV1Client: clientSet.CoreV1(), - CloudProvider: cloudProvider, - }, - KubeClient: e.Client, - CloudProvider: cloudProvider, - } + controller = provisioning.NewController(ctx, e.Client, clientSet.CoreV1(), cloudProvider) + scheduler = provisioning.NewScheduler(e.Client, controller) }) Expect(env.Start()).To(Succeed(), "Failed to start environment") @@ -128,38 +108,29 @@ var _ = Describe("Allocation", func() { Context("Reconciliation", func() { Context("Specialized Hardware", func() { It("should launch instances for Nvidia GPU resource requests", func() { - // Setup - pod1 := test.UnschedulablePod(test.PodOptions{ - ResourceRequirements: v1.ResourceRequirements{ - Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")}, - Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")}, - }, - }) - // Should pack onto same instance - pod2 := test.UnschedulablePod(test.PodOptions{ - ResourceRequirements: v1.ResourceRequirements{ - Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("2")}, - Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("2")}, - }, - }) - // Should pack onto a separate instance - pod3 := test.UnschedulablePod(test.PodOptions{ - ResourceRequirements: v1.ResourceRequirements{ - Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("4")}, - Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("4")}, - }, - }) - ExpectCreated(env.Client, provisioner) - ExpectCreatedWithStatus(env.Client, pod1, pod2, pod3) - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) - // Assertions - scheduled1 := ExpectPodExists(env.Client, pod1.GetName(), pod1.GetNamespace()) - scheduled2 := ExpectPodExists(env.Client, pod2.GetName(), pod2.GetNamespace()) - scheduled3 := ExpectPodExists(env.Client, pod3.GetName(), pod3.GetNamespace()) - Expect(scheduled1.Spec.NodeName).To(Equal(scheduled2.Spec.NodeName)) - Expect(scheduled1.Spec.NodeName).ToNot(Equal(scheduled3.Spec.NodeName)) - ExpectNodeExists(env.Client, scheduled1.Spec.NodeName) - ExpectNodeExists(env.Client, scheduled3.Spec.NodeName) + for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, + test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")}, + Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")}, + }, + }), + // Should pack onto same instance + test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("2")}, + Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("2")}, + }, + }), + // Should pack onto a separate instance + test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("4")}, + Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("4")}, + }, + })) { + ExpectScheduled(ctx, env.Client, pod) + } Expect(InstancesLaunchedFrom(fakeEC2API.CalledWithCreateFleetInput.Iter())).To(Equal(2)) overrides := []*ec2.FleetLaunchTemplateOverridesRequest{} for i := range fakeEC2API.CalledWithCreateFleetInput.Iter() { @@ -170,38 +141,31 @@ var _ = Describe("Allocation", func() { } }) It("should launch instances for AWS Neuron resource requests", func() { - // Setup - pod1 := test.UnschedulablePod(test.PodOptions{ - ResourceRequirements: v1.ResourceRequirements{ - Requests: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")}, - Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")}, - }, - }) - // Should pack onto same instance - pod2 := test.UnschedulablePod(test.PodOptions{ - ResourceRequirements: v1.ResourceRequirements{ - Requests: v1.ResourceList{resources.AWSNeuron: resource.MustParse("2")}, - Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("2")}, - }, - }) - // Should pack onto a separate instance - pod3 := test.UnschedulablePod(test.PodOptions{ - ResourceRequirements: v1.ResourceRequirements{ - Requests: v1.ResourceList{resources.AWSNeuron: resource.MustParse("4")}, - Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("4")}, - }, - }) - ExpectCreated(env.Client, provisioner) - ExpectCreatedWithStatus(env.Client, pod1, pod2, pod3) - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) - // Assertions - scheduled1 := ExpectPodExists(env.Client, pod1.GetName(), pod1.GetNamespace()) - scheduled2 := ExpectPodExists(env.Client, pod2.GetName(), pod2.GetNamespace()) - scheduled3 := ExpectPodExists(env.Client, pod3.GetName(), pod3.GetNamespace()) - Expect(scheduled1.Spec.NodeName).To(Equal(scheduled2.Spec.NodeName)) - Expect(scheduled1.Spec.NodeName).ToNot(Equal(scheduled3.Spec.NodeName)) - ExpectNodeExists(env.Client, scheduled1.Spec.NodeName) - ExpectNodeExists(env.Client, scheduled3.Spec.NodeName) + for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, + test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")}, + Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")}, + }, + }), + // Should pack onto same instance + + test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{resources.AWSNeuron: resource.MustParse("2")}, + Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("2")}, + }, + }), + // Should pack onto a separate instance + test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{resources.AWSNeuron: resource.MustParse("4")}, + Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("4")}, + }, + }), + ) { + ExpectScheduled(ctx, env.Client, pod) + } Expect(InstancesLaunchedFrom(fakeEC2API.CalledWithCreateFleetInput.Iter())).To(Equal(2)) overrides := []*ec2.FleetLaunchTemplateOverridesRequest{} for input := range fakeEC2API.CalledWithCreateFleetInput.Iter() { @@ -214,25 +178,19 @@ var _ = Describe("Allocation", func() { }) Context("CapacityType", func() { It("should default to on demand", func() { - // Setup - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) - // Assertions - ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod())[0] + ExpectScheduled(ctx, env.Client, pod) Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1)) input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput) Expect(input.LaunchTemplateConfigs).To(HaveLen(1)) Expect(*input.TargetCapacitySpecification.DefaultTargetCapacityType).To(Equal(v1alpha1.CapacityTypeOnDemand)) }) It("should launch spot capacity if flexible to both spot and on demand", func() { - // Setup provisioner.Spec.Requirements = v1alpha5.Requirements{{Key: v1alpha5.LabelCapacityType, Operator: v1.NodeSelectorOpIn, Values: []string{v1alpha1.CapacityTypeSpot, v1alpha1.CapacityTypeOnDemand}}} - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1alpha5.LabelCapacityType: v1alpha1.CapacityTypeSpot}}), - ) - // Assertions - ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + )[0] + ExpectScheduled(ctx, env.Client, pod) Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1)) input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput) Expect(input.LaunchTemplateConfigs).To(HaveLen(1)) @@ -260,34 +218,30 @@ var _ = Describe("Allocation", func() { Effect: "NoSchedule", } - ExpectCreated(env.Client, provisioner) - pod1 := test.UnschedulablePod(test.PodOptions{ - Tolerations: []v1.Toleration{t1, t2, t3}, - }) - pod2 := test.UnschedulablePod(test.PodOptions{ - Tolerations: []v1.Toleration{t2, t3, t1}, - }) - // Ensure it's on its own node - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, pod1) - ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + pod1 := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, + test.UnschedulablePod(test.PodOptions{ + Tolerations: []v1.Toleration{t1, t2, t3}, + }), + )[0] + ExpectScheduled(ctx, env.Client, pod1) Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1)) - input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput) - name1 := *(input.LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateName) - // Ensure it's on its own node - pods2 := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, pod2) - ExpectNodeExists(env.Client, pods2[0].Spec.NodeName) + name1 := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput).LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateName + + pod2 := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, + test.UnschedulablePod(test.PodOptions{ + Tolerations: []v1.Toleration{t2, t3, t1}, + }), + )[0] + + ExpectScheduled(ctx, env.Client, pod2) Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1)) - input2 := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput) - name2 := *(input2.LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateName) + name2 := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput).LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateName Expect(name1).To(Equal(name2)) }) It("should default to a generated launch template", func() { - // Setup - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) - // Assertions - ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod())[0] + ExpectScheduled(ctx, env.Client, pod) Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1)) input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput) Expect(input.LaunchTemplateConfigs).To(HaveLen(1)) @@ -295,13 +249,9 @@ var _ = Describe("Allocation", func() { Expect(*launchTemplate.Version).To(Equal("$Default")) }) It("should allow a launch template to be specified", func() { - // Setup provider.LaunchTemplate = aws.String("test-launch-template") - provisioner = ProvisionerWithProvider(provisioner, provider) - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) - // Assertions - ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod())[0] + ExpectScheduled(ctx, env.Client, pod) Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1)) input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput) Expect(input.LaunchTemplateConfigs).To(HaveLen(1)) @@ -312,11 +262,8 @@ var _ = Describe("Allocation", func() { }) Context("Subnets", func() { It("should default to the cluster's subnets", func() { - // Setup - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) - // Assertions - ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod())[0] + ExpectScheduled(ctx, env.Client, pod) Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1)) input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput) Expect(input.LaunchTemplateConfigs).To(HaveLen(1)) @@ -329,11 +276,8 @@ var _ = Describe("Allocation", func() { }) Context("Security Groups", func() { It("should default to the clusters security groups", func() { - // Setup - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) - // Assertions - ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod())[0] + ExpectScheduled(ctx, env.Client, pod) Expect(fakeEC2API.CalledWithCreateLaunchTemplateInput.Cardinality()).To(Equal(1)) input := fakeEC2API.CalledWithCreateLaunchTemplateInput.Pop().(*ec2.CreateLaunchTemplateInput) Expect(input.LaunchTemplateData.SecurityGroupIds).To(ConsistOf( diff --git a/pkg/controllers/allocation/batch.go b/pkg/controllers/allocation/batch.go deleted file mode 100644 index 6207aa41f0bc..000000000000 --- a/pkg/controllers/allocation/batch.go +++ /dev/null @@ -1,173 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package allocation - -import ( - "context" - "time" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" -) - -const ( - opAdd = "add" - opWait = "wait" -) - -// Batcher is a batch manager for multiple objects -type Batcher struct { - // MaxPeriod is the maximum amount of time to batch incoming pods before flushing - MaxPeriod time.Duration - // IdlePeriod is the amount of time to wait to flush a batch when there are no incoming pods but the batch is not empty - // It should be a smaller duration than MaxPeriod - IdlePeriod time.Duration - - // windows keeps a mapping of a key (like a provisioner name and namespace) to a specific object's batch window - windows map[types.UID]*window - // ops is a stream of add and wait operations on a batch window - ops chan *batchOp - // isMonitorRunning indicates if the monitor go routine has been started - isMonitorRunning bool -} - -type batchOp struct { - kind string - key types.UID - waitEnd chan bool -} - -// window is an individual batch window -type window struct { - lastUpdated time.Time - started time.Time - closed []chan bool -} - -// NewBatcher creates a new batch manager to start multiple batch windows -func NewBatcher(maxPeriod time.Duration, idlePeriod time.Duration) *Batcher { - return &Batcher{ - MaxPeriod: maxPeriod, - IdlePeriod: idlePeriod, - windows: map[types.UID]*window{}, - } -} - -// Start should be called before Add or Wait -// It is not safe to call Start concurrently -// but Start can be called synchronously multiple times w/ no effect -func (b *Batcher) Start(ctx context.Context) { - if !b.isMonitorRunning { - b.ops = make(chan *batchOp, 1000) - go b.monitor(ctx) - b.isMonitorRunning = true - } -} - -// Add starts a batching window or adds to an existing in-progress window -// Add is safe to be called concurrently -func (b *Batcher) Add(obj metav1.Object) { - select { - case b.ops <- &batchOp{kind: opAdd, key: obj.GetUID()}: - // Do not block if the channel is full - default: - } -} - -// Wait blocks until a batching window ends -// If the batch is empty, it will block until something is added or the window times out -func (b *Batcher) Wait(obj metav1.Object) { - waitBatchOp := &batchOp{kind: opWait, key: obj.GetUID(), waitEnd: make(chan bool, 1)} - timeout := time.NewTimer(b.MaxPeriod) - select { - case b.ops <- waitBatchOp: - <-waitBatchOp.waitEnd - // if the ops channel is full (should be very rare), allow wait to block until the MaxPeriod - case <-timeout.C: - } -} - -// monitor is a synchronous loop that controls the window start, update, and end -// monitor should be executed in one go routine and will handle all object batch windows -func (b *Batcher) monitor(ctx context.Context) { - defer func() { b.isMonitorRunning = false }() - ticker := time.NewTicker(b.IdlePeriod / 2) - for { - select { - // Wake and check for any timed out batch windows - case <-ticker.C: - for key, batch := range b.windows { - b.checkForWindowEndAndNotify(key, batch) - } - // Process window operations - case op := <-b.ops: - switch op.kind { - // Start a new window or update progress on a window - case opAdd: - b.startOrUpdateWindow(op.key) - // Register a waiter and start a window if no window has been started - case opWait: - window, ok := b.windows[op.key] - if !ok { - window = b.startOrUpdateWindow(op.key) - } - window.closed = append(window.closed, op.waitEnd) - } - // Stop monitor routine on shutdown - case <-ctx.Done(): - for key, window := range b.windows { - b.endWindow(key, window) - } - return - } - } -} - -// checkForWindowEndAndNotify checks if a window has timed out due to inactivity (IdlePeriod) or has reached the MaxBatchPeriod. -// If the batch window has ended, then the batch closed channel will be notified and the window will be removed -func (b *Batcher) checkForWindowEndAndNotify(key types.UID, window *window) { - if time.Since(window.lastUpdated) < b.IdlePeriod && time.Since(window.started) < b.MaxPeriod { - return - } - b.endWindow(key, window) -} - -// endWindow signals the end of a window to all wait consumers and deletes the window -func (b *Batcher) endWindow(key types.UID, window *window) { - for _, end := range window.closed { - select { - case end <- true: - close(end) - default: - } - } - delete(b.windows, key) -} - -// startOrUpdateWindow starts a new window for the object key if one does not already exist -// if a window already exists for the object key, then the lastUpdate time is set -func (b *Batcher) startOrUpdateWindow(key types.UID) *window { - batchWindow, ok := b.windows[key] - if !ok { - batchWindow = &window{lastUpdated: time.Now(), started: time.Now()} - b.windows[key] = batchWindow - return batchWindow - } - batchWindow.lastUpdated = time.Now() - if batchWindow.started.IsZero() { - batchWindow.started = time.Now() - } - return batchWindow -} diff --git a/pkg/controllers/allocation/controller.go b/pkg/controllers/allocation/controller.go deleted file mode 100644 index 0b446f73a438..000000000000 --- a/pkg/controllers/allocation/controller.go +++ /dev/null @@ -1,183 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package allocation - -import ( - "context" - "fmt" - "time" - - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "knative.dev/pkg/logging" - controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" - - "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" - "github.com/awslabs/karpenter/pkg/cloudprovider" - "github.com/awslabs/karpenter/pkg/controllers/allocation/binpacking" - "github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling" -) - -const ( - maxBatchWindow = 10 * time.Second - batchIdleTimeout = 1 * time.Second -) - -// Controller for the resource -type Controller struct { - Batcher *Batcher - Filter *Filter - Scheduler *scheduling.Scheduler - Launcher *Launcher - KubeClient client.Client - CloudProvider cloudprovider.CloudProvider -} - -// NewController constructs a controller instance -func NewController(kubeClient client.Client, coreV1Client corev1.CoreV1Interface, cloudProvider cloudprovider.CloudProvider) *Controller { - return &Controller{ - Batcher: NewBatcher(maxBatchWindow, batchIdleTimeout), - Filter: &Filter{KubeClient: kubeClient}, - Scheduler: scheduling.NewScheduler(kubeClient, cloudProvider), - Launcher: &Launcher{ - Packer: &binpacking.Packer{}, - CloudProvider: cloudProvider, - KubeClient: kubeClient, - CoreV1Client: coreV1Client, - }, - KubeClient: kubeClient, - CloudProvider: cloudProvider, - } -} - -// Reconcile executes an allocation control loop for the resource -func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named(fmt.Sprintf("allocation.provisioner/%s", req.Name))) - logging.FromContext(ctx).Infof("Starting provisioning loop") - // Fetch provisioner - provisioner, err := c.provisionerFor(ctx, req.NamespacedName) - if err != nil { - if errors.IsNotFound(err) { - c.Batcher.Wait(&v1alpha5.Provisioner{}) - logging.FromContext(ctx).Errorf("Provisioner \"%s\" not found. Create the \"default\" provisioner or specify an alternative using the nodeSelector %s", req.Name, v1alpha5.ProvisionerNameLabelKey) - return reconcile.Result{}, nil - } - return reconcile.Result{}, err - } - // Wait on a pod batch - logging.FromContext(ctx).Infof("Waiting to batch additional pods") - c.Batcher.Wait(provisioner) - // Filter pods - pods, err := c.Filter.GetProvisionablePods(ctx, provisioner) - if err != nil { - return reconcile.Result{}, fmt.Errorf("filtering pods, %w", err) - } - logging.FromContext(ctx).Infof("Found %d provisionable pods", len(pods)) - if len(pods) == 0 { - logging.FromContext(ctx).Infof("Watching for pod events") - return reconcile.Result{}, nil - } - // Get Instance Types Options - instanceTypes, err := c.CloudProvider.GetInstanceTypes(ctx, &provisioner.Spec.Constraints) - if err != nil { - return reconcile.Result{}, fmt.Errorf("getting instance types, %w", err) - } - // Separate pods by scheduling constraints - schedules, err := c.Scheduler.Solve(ctx, provisioner, instanceTypes, pods) - if err != nil { - return reconcile.Result{}, fmt.Errorf("solving scheduling constraints, %w", err) - } - // Launch capacity and bind pods - if err := c.Launcher.Launch(ctx, schedules, instanceTypes); err != nil { - return reconcile.Result{}, err - } - return reconcile.Result{Requeue: true}, nil -} - -func (c *Controller) Register(ctx context.Context, m manager.Manager) error { - err := controllerruntime. - NewControllerManagedBy(m). - Named("Allocation"). - For(&v1alpha5.Provisioner{}). - Watches( - &source.Kind{Type: &v1.Pod{}}, - handler.EnqueueRequestsFromMapFunc(c.podToProvisioner(ctx)), - // Only process pod update events - builder.WithPredicates( - predicate.Funcs{ - CreateFunc: func(_ event.CreateEvent) bool { return false }, - DeleteFunc: func(_ event.DeleteEvent) bool { return false }, - GenericFunc: func(_ event.GenericEvent) bool { return false }, - }, - ), - ). - WithOptions(controller.Options{MaxConcurrentReconciles: 10}). - Complete(c) - c.Batcher.Start(ctx) - return err -} - -// provisionerFor fetches the provisioner and returns a provisioner w/ default runtime values -func (c *Controller) provisionerFor(ctx context.Context, name types.NamespacedName) (*v1alpha5.Provisioner, error) { - provisioner := &v1alpha5.Provisioner{} - if err := c.KubeClient.Get(ctx, name, provisioner); err != nil { - return nil, err - } - return provisioner, nil -} - -// podToProvisioner is a function handler to transform pod objs to provisioner reconcile requests -func (c *Controller) podToProvisioner(ctx context.Context) func(o client.Object) []reconcile.Request { - return func(o client.Object) (requests []reconcile.Request) { - pod := o.(*v1.Pod) - if err := c.Filter.isUnschedulable(pod); err != nil { - return nil - } - provisionerKey := v1alpha5.DefaultProvisioner - if name, ok := pod.Spec.NodeSelector[v1alpha5.ProvisionerNameLabelKey]; ok { - provisionerKey.Name = name - } - provisioner, err := c.provisionerFor(ctx, provisionerKey) - if err != nil { - if errors.IsNotFound(err) { - // Queue and batch a reconcile request for a non-existent, empty provisioner - // This will reduce the number of repeated error messages about a provisioner not existing - c.Batcher.Add(&v1alpha5.Provisioner{}) - notFoundProvisioner := v1alpha5.DefaultProvisioner.Name - if name, ok := pod.Spec.NodeSelector[v1alpha5.ProvisionerNameLabelKey]; ok { - notFoundProvisioner = name - } - return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: notFoundProvisioner}}} - } - return nil - } - if err = c.Filter.isProvisionable(pod, provisioner); err != nil { - return nil - } - c.Batcher.Add(provisioner) - return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: provisioner.Name}}} - } -} diff --git a/pkg/controllers/allocation/filter.go b/pkg/controllers/allocation/filter.go deleted file mode 100644 index b7400b6f9b61..000000000000 --- a/pkg/controllers/allocation/filter.go +++ /dev/null @@ -1,133 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package allocation - -import ( - "context" - "fmt" - - "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" - "github.com/awslabs/karpenter/pkg/utils/pod" - "github.com/awslabs/karpenter/pkg/utils/ptr" - "go.uber.org/multierr" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/sets" - "knative.dev/pkg/logging" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -type Filter struct { - KubeClient client.Client -} - -func (f *Filter) GetProvisionablePods(ctx context.Context, provisioner *v1alpha5.Provisioner) ([]*v1.Pod, error) { - // 1. List Pods that aren't scheduled - pods := &v1.PodList{} - if err := f.KubeClient.List(ctx, pods, client.MatchingFields{"spec.nodeName": ""}); err != nil { - return nil, fmt.Errorf("listing unscheduled pods, %w", err) - } - // 2. Filter pods that aren't provisionable - provisionable := []*v1.Pod{} - for i := range pods.Items { - p := pods.Items[i] - if err := f.isProvisionable(&p, provisioner); err != nil { - logging.FromContext(ctx).Debugf("Ignored pod %s/%s when allocating for provisioner %s, %s", - p.Name, p.Namespace, provisioner.Name, err.Error(), - ) - continue - } - provisionable = append(provisionable, ptr.Pod(p)) - } - return provisionable, nil -} - -func (f *Filter) isProvisionable(pod *v1.Pod, provisioner *v1alpha5.Provisioner) error { - return multierr.Combine( - f.isUnschedulable(pod), - f.validateAffinity(pod), - f.validateTopology(pod), - f.matchesProvisioner(pod, provisioner), - ) -} - -func (f *Filter) isUnschedulable(p *v1.Pod) error { - if !pod.FailedToSchedule(p) { - return fmt.Errorf("awaiting scheduling") - } - if pod.IsOwnedByDaemonSet(p) { - return fmt.Errorf("owned by daemonset") - } - if pod.IsOwnedByNode(p) { - return fmt.Errorf("owned by node") - } - return nil -} - -func (f *Filter) matchesProvisioner(pod *v1.Pod, provisioner *v1alpha5.Provisioner) error { - name, ok := pod.Spec.NodeSelector[v1alpha5.ProvisionerNameLabelKey] - if ok && provisioner.Name == name { - return nil - } - if !ok && provisioner.Name == v1alpha5.DefaultProvisioner.Name { - return nil - } - return fmt.Errorf("matched another provisioner, %s", name) -} - -func (f *Filter) validateTopology(pod *v1.Pod) (errs error) { - for _, constraint := range pod.Spec.TopologySpreadConstraints { - if supported := sets.NewString(v1.LabelHostname, v1.LabelTopologyZone); !supported.Has(constraint.TopologyKey) { - errs = multierr.Append(errs, fmt.Errorf("unsupported topology key, %s not in %s", constraint.TopologyKey, supported)) - } - } - return errs -} - -func (f *Filter) validateAffinity(pod *v1.Pod) (errs error) { - if pod.Spec.Affinity == nil { - return nil - } - if pod.Spec.Affinity.PodAffinity != nil { - errs = multierr.Append(errs, fmt.Errorf("pod affinity is not supported")) - } - if pod.Spec.Affinity.PodAntiAffinity != nil { - errs = multierr.Append(errs, fmt.Errorf("pod anti-affinity is not supported")) - } - if pod.Spec.Affinity.NodeAffinity != nil { - for _, term := range pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution { - errs = multierr.Append(errs, validateNodeSelectorTerm(term.Preference)) - } - if pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil { - for _, term := range pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms { - errs = multierr.Append(errs, validateNodeSelectorTerm(term)) - } - } - } - return errs -} - -func validateNodeSelectorTerm(term v1.NodeSelectorTerm) (errs error) { - if term.MatchFields != nil { - errs = multierr.Append(errs, fmt.Errorf("matchFields is not supported")) - } - if term.MatchExpressions != nil { - for _, requirement := range term.MatchExpressions { - if !sets.NewString(string(v1.NodeSelectorOpIn), string(v1.NodeSelectorOpNotIn)).Has(string(requirement.Operator)) { - errs = multierr.Append(errs, fmt.Errorf("unsupported operator, %s", requirement.Operator)) - } - } - } - return errs -} diff --git a/pkg/controllers/node/controller.go b/pkg/controllers/node/controller.go index 797652a5810c..f7d4387ecd31 100644 --- a/pkg/controllers/node/controller.go +++ b/pkg/controllers/node/controller.go @@ -59,7 +59,7 @@ type Controller struct { // Reconcile executes a reallocation control loop for the resource func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("Node")) + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("node")) // 1. Retrieve Node, ignore if not provisioned or terminating stored := &v1.Node{} if err := c.kubeClient.Get(ctx, req.NamespacedName, stored); err != nil { @@ -115,7 +115,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco func (c *Controller) Register(ctx context.Context, m manager.Manager) error { return controllerruntime. NewControllerManagedBy(m). - Named("Node"). + Named("node"). For(&v1.Node{}). Watches( // Reconcile all nodes related to a provisioner when it changes. diff --git a/pkg/controllers/allocation/binpacking/packable.go b/pkg/controllers/provisioning/binpacking/packable.go similarity index 99% rename from pkg/controllers/allocation/binpacking/packable.go rename to pkg/controllers/provisioning/binpacking/packable.go index ea801fdd5421..daa63ffcb1f0 100644 --- a/pkg/controllers/allocation/binpacking/packable.go +++ b/pkg/controllers/provisioning/binpacking/packable.go @@ -19,7 +19,7 @@ import ( "fmt" "github.com/awslabs/karpenter/pkg/cloudprovider" - "github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling" + "github.com/awslabs/karpenter/pkg/controllers/provisioning/scheduling" "github.com/awslabs/karpenter/pkg/utils/resources" "go.uber.org/multierr" v1 "k8s.io/api/core/v1" diff --git a/pkg/controllers/allocation/binpacking/packer.go b/pkg/controllers/provisioning/binpacking/packer.go similarity index 94% rename from pkg/controllers/allocation/binpacking/packer.go rename to pkg/controllers/provisioning/binpacking/packer.go index eb59a5b3c17f..578ff5c0909f 100644 --- a/pkg/controllers/allocation/binpacking/packer.go +++ b/pkg/controllers/provisioning/binpacking/packer.go @@ -21,7 +21,7 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/awslabs/karpenter/pkg/cloudprovider" - "github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling" + "github.com/awslabs/karpenter/pkg/controllers/provisioning/scheduling" "github.com/awslabs/karpenter/pkg/metrics" "github.com/awslabs/karpenter/pkg/utils/apiobject" "github.com/awslabs/karpenter/pkg/utils/resources" @@ -97,14 +97,13 @@ func (p *Packer) Pack(ctx context.Context, schedule *scheduling.Schedule, instan if mainPack, ok := packs[key]; ok { mainPack.NodeQuantity++ mainPack.Pods = append(mainPack.Pods, packing.Pods...) - logging.FromContext(ctx).Debugf("Incremented node count to %d on packing for %d pod(s) with instance type option(s) %v", mainPack.NodeQuantity, flattenedLen(packing.Pods...), instanceTypeNames(mainPack.InstanceTypeOptions)) continue } else { packs[key] = packing } } packings = append(packings, packing) - logging.FromContext(ctx).Infof("Computed packing for %d pod(s) with instance type option(s) %s", flattenedLen(packing.Pods...), instanceTypeNames(packing.InstanceTypeOptions)) + logging.FromContext(ctx).Infof("Computed packing of %d nodes for %d pod(s) with instance type option(s) %s", packing.NodeQuantity, flattenedLen(packing.Pods...), instanceTypeNames(packing.InstanceTypeOptions)) } return packings } diff --git a/pkg/controllers/provisioning/controller.go b/pkg/controllers/provisioning/controller.go new file mode 100644 index 000000000000..a668c3455940 --- /dev/null +++ b/pkg/controllers/provisioning/controller.go @@ -0,0 +1,115 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package provisioning + +import ( + "context" + "sync" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "knative.dev/pkg/logging" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" + "github.com/awslabs/karpenter/pkg/cloudprovider" + "github.com/awslabs/karpenter/pkg/controllers/provisioning/binpacking" + "github.com/awslabs/karpenter/pkg/controllers/provisioning/scheduling" +) + +// Controller for the resource +type Controller struct { + ctx context.Context + provisioners *sync.Map + scheduler *scheduling.Scheduler + launcher *Launcher + kubeClient client.Client + cloudProvider cloudprovider.CloudProvider +} + +// NewController is a constructor +func NewController(ctx context.Context, kubeClient client.Client, coreV1Client corev1.CoreV1Interface, cloudProvider cloudprovider.CloudProvider) *Controller { + return &Controller{ + ctx: ctx, + provisioners: &sync.Map{}, + kubeClient: kubeClient, + cloudProvider: cloudProvider, + scheduler: scheduling.NewScheduler(kubeClient, cloudProvider), + launcher: &Launcher{KubeClient: kubeClient, CoreV1Client: coreV1Client, CloudProvider: cloudProvider, Packer: &binpacking.Packer{}}, + } +} + +// Reconcile a control loop for the resource +func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("provisioning").With("provisioner", req.Name)) + provisioner := &v1alpha5.Provisioner{} + if err := c.kubeClient.Get(ctx, req.NamespacedName, provisioner); err != nil { + if errors.IsNotFound(err) { + c.Delete(ctx, req.Name) + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + // Apply the latest configuration + if err := c.Apply(ctx, provisioner); err != nil { + return reconcile.Result{}, err + } + // Requeue in order to discover any changes from GetInstanceTypes. + return reconcile.Result{RequeueAfter: 5 * time.Minute}, nil +} + +// Delete stops and removes a provisioner +func (c *Controller) Delete(ctx context.Context, name string) { + if p, ok := c.provisioners.LoadAndDelete(name); ok { + p.(*Provisioner).Stop() + } +} + +// Apply creates or updates the provisioner to the latest configuration +func (c *Controller) Apply(ctx context.Context, provisioner *v1alpha5.Provisioner) error { + // Stop the existing provisioner if exists. This will drain the current + // workflow and replace it with an updated provisioner configuration. + c.Delete(ctx, provisioner.Name) + instanceTypes, err := c.cloudProvider.GetInstanceTypes(ctx, &provisioner.Spec.Constraints) + if err != nil { + return err + } + c.provisioners.Store(provisioner.Name, NewProvisioner(ctx, provisioner, instanceTypes, c.scheduler, c.launcher)) + return nil +} + +// List the active provisioners +func (c *Controller) List(ctx context.Context) []*Provisioner { + provisioners := []*Provisioner{} + c.provisioners.Range(func(key, value interface{}) bool { + provisioners = append(provisioners, value.(*Provisioner)) + return true + }) + return provisioners +} + +// Register the controller to the manager +func (c *Controller) Register(_ context.Context, m manager.Manager) error { + return controllerruntime. + NewControllerManagedBy(m). + Named("provisioning"). + For(&v1alpha5.Provisioner{}). + WithOptions(controller.Options{MaxConcurrentReconciles: 10}). + Complete(c) +} diff --git a/pkg/controllers/allocation/launcher.go b/pkg/controllers/provisioning/launcher.go similarity index 96% rename from pkg/controllers/allocation/launcher.go rename to pkg/controllers/provisioning/launcher.go index ae6439b2a561..5a9c97ad6853 100644 --- a/pkg/controllers/allocation/launcher.go +++ b/pkg/controllers/provisioning/launcher.go @@ -12,7 +12,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package allocation +package provisioning import ( "context" @@ -20,8 +20,8 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/awslabs/karpenter/pkg/cloudprovider" - "github.com/awslabs/karpenter/pkg/controllers/allocation/binpacking" - "github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling" + "github.com/awslabs/karpenter/pkg/controllers/provisioning/binpacking" + "github.com/awslabs/karpenter/pkg/controllers/provisioning/scheduling" "github.com/awslabs/karpenter/pkg/metrics" "github.com/awslabs/karpenter/pkg/utils/functional" "github.com/prometheus/client_golang/prometheus" diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go new file mode 100644 index 000000000000..28f273b6cd72 --- /dev/null +++ b/pkg/controllers/provisioning/provisioner.go @@ -0,0 +1,138 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provisioning + +import ( + "context" + "fmt" + "time" + + "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" + "github.com/awslabs/karpenter/pkg/cloudprovider" + "github.com/awslabs/karpenter/pkg/controllers/provisioning/scheduling" + "github.com/awslabs/karpenter/pkg/utils/functional" + v1 "k8s.io/api/core/v1" + "knative.dev/pkg/logging" +) + +var ( + MaxBatchDuration = time.Second * 10 + MinBatchDuration = time.Second * 1 +) + +// Provisioner waits for enqueued pods, batches them, creates capacity and binds the pods to the capacity. +type Provisioner struct { + // State + *v1alpha5.Provisioner + instanceTypes []cloudprovider.InstanceType + pods chan *v1.Pod + results chan error + Stop context.CancelFunc + + // Dependencies + scheduler *scheduling.Scheduler + launcher *Launcher +} + +// NewProvisioner creates a new provisioner and starts the provisioning loop +func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, instanceTypes []cloudprovider.InstanceType, scheduler *scheduling.Scheduler, launcher *Launcher) *Provisioner { + provisioner.Spec.Labels = functional.UnionStringMaps(provisioner.Spec.Labels, map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}) + provisioner.Spec.Requirements = provisioner.Spec.Requirements. + With(scheduling.GlobalRequirements(instanceTypes)). // TODO(etarn) move GlobalRequirements to this file + With(v1alpha5.LabelRequirements(provisioner.Spec.Labels)) + ctx, cancelFunc := context.WithCancel(ctx) + p := &Provisioner{ + Provisioner: provisioner, + instanceTypes: instanceTypes, + pods: make(chan *v1.Pod), + results: make(chan error), + scheduler: scheduler, + launcher: launcher, + Stop: cancelFunc, + } + p.Start(ctx) + return p +} + +func (p *Provisioner) Start(ctx context.Context) { + go func() { + logging.FromContext(ctx).Info("Starting provisioner") + for { + select { + case <-ctx.Done(): + logging.FromContext(ctx).Info("Stopping provisioner") + close(p.pods) + close(p.results) + return + default: + if err := p.Provision(ctx); err != nil { + logging.FromContext(ctx).Errorf("Provisioning failed, %s", err.Error()) + } + } + } + }() +} + +func (p *Provisioner) Provision(ctx context.Context) (err error) { + // Wait for a batch of pods + pods := p.Batch(ctx) + // Send results + defer func() { + for i := 0; i < len(pods); i++ { + p.results <- err + } + }() + // Separate pods by scheduling constraints + schedules, err := p.scheduler.Solve(ctx, p.Provisioner, p.instanceTypes, pods) + if err != nil { + return fmt.Errorf("solving scheduling constraints, %w", err) + } + // Launch capacity and bind pods + if err := p.launcher.Launch(ctx, schedules, p.instanceTypes); err != nil { + return fmt.Errorf("launching capacity, %w", err) + } + return nil +} + +// Enqueue a pod to the provisioner and block until it's processed +func (p *Provisioner) Enqueue(ctx context.Context, pod *v1.Pod) error { + p.pods <- pod + return <-p.results +} + +// Batch returns a slice of enqueued pods after idle or timeout +func (p *Provisioner) Batch(ctx context.Context) (pods []*v1.Pod) { + logging.FromContext(ctx).Infof("Waiting for unschedulable pods") + pods = append(pods, <-p.pods) + timeout := time.NewTimer(MaxBatchDuration) + idle := time.NewTimer(MinBatchDuration) + start := time.Now() + defer func() { + logging.FromContext(ctx).Infof("Batched %d pods in %s", len(pods), time.Since(start)) + }() + for { + select { + case pod := <-p.pods: + idle.Reset(MinBatchDuration) + pods = append(pods, pod) + case <-ctx.Done(): + return pods + case <-timeout.C: + return pods + case <-idle.C: + return pods + } + } +} diff --git a/pkg/controllers/provisioning/scheduler.go b/pkg/controllers/provisioning/scheduler.go new file mode 100644 index 000000000000..bed8906f2946 --- /dev/null +++ b/pkg/controllers/provisioning/scheduler.go @@ -0,0 +1,182 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provisioning + +import ( + "context" + "fmt" + + "github.com/awslabs/karpenter/pkg/controllers/provisioning/scheduling" + "github.com/awslabs/karpenter/pkg/utils/pod" + "go.uber.org/multierr" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/sets" + "knative.dev/pkg/logging" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// Controller for the resource +type Scheduler struct { + kubeClient client.Client + controller *Controller + preferences *scheduling.Preferences +} + +// NewScheduler constructs a controller instance +func NewScheduler(kubeClient client.Client, controller *Controller) *Scheduler { + return &Scheduler{ + kubeClient: kubeClient, + controller: controller, + preferences: scheduling.NewPreferences(), + } +} + +// Reconcile the resource +func (s *Scheduler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("scheduling").With("pod", req.String())) + pod := &v1.Pod{} + if err := s.kubeClient.Get(ctx, req.NamespacedName, pod); err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + + // Ensure the pod can be provisioned, but only log + if err := isUnschedulable(pod); err != nil { + return reconcile.Result{}, nil + } + if err := validate(pod); err != nil { + logging.FromContext(ctx).Debugf("Ignoring pod, %s", err.Error()) + return reconcile.Result{}, nil + } + // Schedule and requeue. If successful, will terminate in the unschedulable check above + if err := s.Schedule(ctx, pod); err != nil { + logging.FromContext(ctx).Errorf("Failed to schedule, %s", err.Error()) + } + return reconcile.Result{Requeue: true}, nil +} + +func (s *Scheduler) Schedule(ctx context.Context, pod *v1.Pod) error { + // Relax preferences if pod has previously failed to schedule. + s.preferences.Relax(ctx, pod) + // Pick provisioner + var provisioner *Provisioner + var errs error + for _, candidate := range s.controller.List(ctx) { + if err := candidate.Spec.Supports(pod); err != nil { + errs = multierr.Append(errs, fmt.Errorf("tried provisioner/%s: %w", candidate.Name, err)) + } else { + provisioner = candidate + break + } + } + if provisioner == nil { + err := fmt.Errorf("matched 0/%d provisioners", len(multierr.Errors(errs))) + if errs != nil { + err = fmt.Errorf("%s, %w", err, errs) + } + return err + } + // Enqueue and wait for provisioning + logging.FromContext(ctx).Debugf("Scheduling pod to provisioner %q", provisioner.Name) + if err := provisioner.Enqueue(ctx, pod); err != nil { + return fmt.Errorf("provisioner %q failed", provisioner.Name) + } + return nil +} + +func isUnschedulable(p *v1.Pod) error { + if p.Spec.NodeName != "" { + return fmt.Errorf("already scheduled") + } + if !pod.FailedToSchedule(p) { + return fmt.Errorf("awaiting scheduling") + } + if pod.IsOwnedByDaemonSet(p) { + return fmt.Errorf("owned by daemonset") + } + if pod.IsOwnedByNode(p) { + return fmt.Errorf("owned by node") + } + return nil +} + +func validate(p *v1.Pod) error { + return multierr.Combine( + validateAffinity(p), + validateTopology(p), + ) +} + +func validateTopology(pod *v1.Pod) (errs error) { + for _, constraint := range pod.Spec.TopologySpreadConstraints { + if supported := sets.NewString(v1.LabelHostname, v1.LabelTopologyZone); !supported.Has(constraint.TopologyKey) { + errs = multierr.Append(errs, fmt.Errorf("unsupported topology key, %s not in %s", constraint.TopologyKey, supported)) + } + } + return errs +} + +func validateAffinity(pod *v1.Pod) (errs error) { + if pod.Spec.Affinity == nil { + return nil + } + if pod.Spec.Affinity.PodAffinity != nil { + errs = multierr.Append(errs, fmt.Errorf("pod affinity is not supported")) + } + if pod.Spec.Affinity.PodAntiAffinity != nil { + errs = multierr.Append(errs, fmt.Errorf("pod anti-affinity is not supported")) + } + if pod.Spec.Affinity.NodeAffinity != nil { + for _, term := range pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution { + errs = multierr.Append(errs, validateNodeSelectorTerm(term.Preference)) + } + if pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil { + for _, term := range pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms { + errs = multierr.Append(errs, validateNodeSelectorTerm(term)) + } + } + } + return errs +} + +func validateNodeSelectorTerm(term v1.NodeSelectorTerm) (errs error) { + if term.MatchFields != nil { + errs = multierr.Append(errs, fmt.Errorf("node selector term with matchFields is not supported")) + } + if term.MatchExpressions != nil { + for _, requirement := range term.MatchExpressions { + if !sets.NewString(string(v1.NodeSelectorOpIn), string(v1.NodeSelectorOpNotIn)).Has(string(requirement.Operator)) { + errs = multierr.Append(errs, fmt.Errorf("node selector term has unsupported operator, %s", requirement.Operator)) + } + } + } + return errs +} + +func (s *Scheduler) Register(_ context.Context, m manager.Manager) error { + return controllerruntime. + NewControllerManagedBy(m). + Named("scheduling"). + For(&v1.Pod{}). + WithOptions(controller.Options{MaxConcurrentReconciles: 10000}). + Complete(s) +} diff --git a/pkg/controllers/allocation/scheduling/preferences.go b/pkg/controllers/provisioning/scheduling/preferences.go similarity index 86% rename from pkg/controllers/allocation/scheduling/preferences.go rename to pkg/controllers/provisioning/scheduling/preferences.go index e80a27dddda1..68a41389fa33 100644 --- a/pkg/controllers/allocation/scheduling/preferences.go +++ b/pkg/controllers/provisioning/scheduling/preferences.go @@ -42,24 +42,22 @@ func NewPreferences() *Preferences { } } -// Relax removes soft preferences from pods to enable scheduling if the cloud +// Relax removes soft preferences from pod to enable scheduling if the cloud // provider's capacity is constrained. For example, this can be leveraged to // prefer a specific zone, but relax the preferences if the pod cannot be // scheduled to that zone. Preferences are removed iteratively until only hard // constraints remain. Pods relaxation is reset (forgotten) after 5 minutes. -func (p *Preferences) Relax(ctx context.Context, pods []*v1.Pod) { - for _, pod := range pods { - affinity, ok := p.cache.Get(string(pod.UID)) - // Add to cache if we've never seen it before - if !ok { - p.cache.SetDefault(string(pod.UID), pod.Spec.Affinity) - continue - } - // Attempt to relax the pod and update the cache - pod.Spec.Affinity = affinity.(*v1.Affinity) - if relaxed := p.relax(ctx, pod); relaxed { - p.cache.SetDefault(string(pod.UID), pod.Spec.Affinity) - } +func (p *Preferences) Relax(ctx context.Context, pod *v1.Pod) { + affinity, ok := p.cache.Get(string(pod.UID)) + // Add to cache if we've never seen it before + if !ok { + p.cache.SetDefault(string(pod.UID), pod.Spec.Affinity) + return + } + // Attempt to relax the pod and update the cache + pod.Spec.Affinity = affinity.(*v1.Affinity) + if relaxed := p.relax(ctx, pod); relaxed { + p.cache.SetDefault(string(pod.UID), pod.Spec.Affinity) } } diff --git a/pkg/controllers/allocation/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go similarity index 88% rename from pkg/controllers/allocation/scheduling/scheduler.go rename to pkg/controllers/provisioning/scheduling/scheduler.go index 795d65c88060..4d29810c76ac 100644 --- a/pkg/controllers/allocation/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -21,7 +21,6 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/awslabs/karpenter/pkg/cloudprovider" "github.com/awslabs/karpenter/pkg/metrics" - "github.com/awslabs/karpenter/pkg/utils/functional" "github.com/mitchellh/hashstructure/v2" "github.com/prometheus/client_golang/prometheus" appsv1 "k8s.io/api/apps/v1" @@ -51,7 +50,6 @@ type Scheduler struct { CloudProvider cloudprovider.CloudProvider KubeClient client.Client Topology *Topology - Preferences *Preferences } type Schedule struct { @@ -67,37 +65,27 @@ func NewScheduler(kubeClient client.Client, cloudProvider cloudprovider.CloudPro CloudProvider: cloudProvider, KubeClient: kubeClient, Topology: &Topology{kubeClient: kubeClient}, - Preferences: NewPreferences(), } } func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha5.Provisioner, instanceTypes []cloudprovider.InstanceType, pods []*v1.Pod) (schedules []*Schedule, err error) { defer metrics.Measure(schedulingDuration.WithLabelValues(provisioner.Name))() - - constraints := provisioner.Spec.Constraints.DeepCopy() - constraints.Labels = functional.UnionStringMaps(constraints.Labels, map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}) - constraints.Requirements = provisioner.Spec.Requirements. - With(globalRequirements(instanceTypes)). - With(v1alpha5.LabelRequirements(constraints.Labels)) - - // Relax preferences if pods have previously failed to schedule. - s.Preferences.Relax(ctx, pods) // Inject temporarily adds specific NodeSelectors to pods, which are then // used by scheduling logic. This isn't strictly necessary, but is a useful // trick to avoid passing topology decisions through the scheduling code. It // lets us to treat TopologySpreadConstraints as just-in-time NodeSelectors. - if err := s.Topology.Inject(ctx, constraints, pods); err != nil { + if err := s.Topology.Inject(ctx, &provisioner.Spec.Constraints, pods); err != nil { return nil, fmt.Errorf("injecting topology, %w", err) } // Separate pods into schedules of isomorphic scheduling constraints. - schedules, err = s.getSchedules(ctx, constraints, pods) + schedules, err = s.getSchedules(ctx, &provisioner.Spec.Constraints, pods) if err != nil { return nil, fmt.Errorf("getting schedules, %w", err) } return schedules, nil } -func globalRequirements(instanceTypes []cloudprovider.InstanceType) (requirements v1alpha5.Requirements) { +func GlobalRequirements(instanceTypes []cloudprovider.InstanceType) (requirements v1alpha5.Requirements) { supported := map[string]sets.String{ v1.LabelInstanceTypeStable: sets.NewString(), v1.LabelTopologyZone: sets.NewString(), diff --git a/pkg/controllers/allocation/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go similarity index 70% rename from pkg/controllers/allocation/scheduling/suite_test.go rename to pkg/controllers/provisioning/scheduling/suite_test.go index a755f0ee59e2..d1e5cce12c0d 100644 --- a/pkg/controllers/allocation/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -22,9 +22,7 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/awslabs/karpenter/pkg/cloudprovider/fake" "github.com/awslabs/karpenter/pkg/cloudprovider/registry" - "github.com/awslabs/karpenter/pkg/controllers/allocation" - "github.com/awslabs/karpenter/pkg/controllers/allocation/binpacking" - "github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling" + "github.com/awslabs/karpenter/pkg/controllers/provisioning" "github.com/awslabs/karpenter/pkg/test" "sigs.k8s.io/controller-runtime/pkg/client" @@ -40,7 +38,8 @@ import ( var ctx context.Context var provisioner *v1alpha5.Provisioner -var controller *allocation.Controller +var controller *provisioning.Controller +var scheduler *provisioning.Scheduler var env *test.Environment func TestAPIs(t *testing.T) { @@ -51,21 +50,11 @@ func TestAPIs(t *testing.T) { var _ = BeforeSuite(func() { env = test.NewEnvironment(ctx, func(e *test.Environment) { + provisioning.MinBatchDuration = 0 // Speed up testing by not batching cloudProvider := &fake.CloudProvider{} registry.RegisterOrDie(ctx, cloudProvider) - controller = &allocation.Controller{ - Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond), - Filter: &allocation.Filter{KubeClient: e.Client}, - Scheduler: scheduling.NewScheduler(e.Client, cloudProvider), - Launcher: &allocation.Launcher{ - Packer: &binpacking.Packer{}, - KubeClient: e.Client, - CoreV1Client: corev1.NewForConfigOrDie(e.Config), - CloudProvider: cloudProvider, - }, - KubeClient: e.Client, - CloudProvider: cloudProvider, - } + controller = provisioning.NewController(ctx, e.Client, corev1.NewForConfigOrDie(e.Config), cloudProvider) + scheduler = provisioning.NewScheduler(e.Client, controller) }) Expect(env.Start()).To(Succeed(), "Failed to start environment") }) @@ -85,209 +74,199 @@ var _ = AfterEach(func() { ExpectCleanedUp(env.Client) }) -var _ = Describe("Combining Constraints", func() { +var _ = Describe("Combined Constraints", func() { Context("Custom Labels", func() { It("should schedule unconstrained pods that don't have matching node selectors", func() { provisioner.Spec.Labels = map[string]string{"test-key": "test-value"} - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod())[0] + node := ExpectScheduled(ctx, env.Client, pod) Expect(node.Labels).To(HaveKeyWithValue("test-key", "test-value")) }) It("should not schedule pods that have conflicting node selectors", func() { provisioner.Spec.Labels = map[string]string{"test-key": "test-value"} - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod(test.PodOptions{ - NodeSelector: map[string]string{"test-key": "different-value"}, - })) - Expect(pods[0].Spec.NodeName).To(BeEmpty()) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{NodeSelector: map[string]string{"test-key": "different-value"}}, + ))[0] + ExpectNotScheduled(ctx, env.Client, pod) }) It("should schedule pods that have matching requirements", func() { provisioner.Spec.Labels = map[string]string{"test-key": "test-value"} - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod( + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( test.PodOptions{NodeRequirements: []v1.NodeSelectorRequirement{ {Key: "test-key", Operator: v1.NodeSelectorOpIn, Values: []string{"test-value", "another-value"}}, }}, - )) - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + ))[0] + node := ExpectScheduled(ctx, env.Client, pod) Expect(node.Labels).To(HaveKeyWithValue("test-key", "test-value")) }) It("should not schedule pods that have conflicting requirements", func() { provisioner.Spec.Labels = map[string]string{"test-key": "test-value"} - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod( + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( test.PodOptions{NodeRequirements: []v1.NodeSelectorRequirement{ {Key: "test-key", Operator: v1.NodeSelectorOpIn, Values: []string{"another-value"}}, }}, - )) - Expect(pods[0].Spec.NodeName).To(BeEmpty()) + ))[0] + ExpectNotScheduled(ctx, env.Client, pod) }) It("should schedule pods that have matching preferences", func() { provisioner.Spec.Labels = map[string]string{"test-key": "test-value"} - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod( + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( test.PodOptions{NodePreferences: []v1.NodeSelectorRequirement{ {Key: "test-key", Operator: v1.NodeSelectorOpIn, Values: []string{"another-value", "test-value"}}, }}, - )) - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + ))[0] + node := ExpectScheduled(ctx, env.Client, pod) Expect(node.Labels).To(HaveKeyWithValue("test-key", "test-value")) }) It("should not schedule pods with have conflicting preferences", func() { provisioner.Spec.Labels = map[string]string{"test-key": "test-value"} - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod( + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( test.PodOptions{NodePreferences: []v1.NodeSelectorRequirement{ {Key: "test-key", Operator: v1.NodeSelectorOpNotIn, Values: []string{"test-value"}}, }}, - )) - Expect(pods[0].Spec.NodeName).To(BeEmpty()) + ))[0] + ExpectNotScheduled(ctx, env.Client, pod) }) }) Context("Well Known Labels", func() { It("should use provisioner constraints", func() { provisioner.Spec.Requirements = v1alpha5.Requirements{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-2"}}} - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod())[0] + node := ExpectScheduled(ctx, env.Client, pod) Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-2")) }) It("should use node selectors", func() { provisioner.Spec.Requirements = v1alpha5.Requirements{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2"}}} - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-2"}})) - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-2"}}, + ))[0] + node := ExpectScheduled(ctx, env.Client, pod) Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-2")) }) It("should not schedule the pod if nodeselector unknown", func() { provisioner.Spec.Requirements = v1alpha5.Requirements{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}}} - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1.LabelTopologyZone: "unknown"}})) - Expect(pods[0].Spec.NodeName).To(BeEmpty()) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{NodeSelector: map[string]string{v1.LabelTopologyZone: "unknown"}}, + ))[0] + ExpectNotScheduled(ctx, env.Client, pod) }) It("should not schedule if node selector outside of provisioner constraints", func() { provisioner.Spec.Requirements = v1alpha5.Requirements{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}}} - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-2"}})) - Expect(pods[0].Spec.NodeName).To(BeEmpty()) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-2"}}, + ))[0] + ExpectNotScheduled(ctx, env.Client, pod) }) It("should schedule compatible requirements with Operator=In", func() { - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(test.PodOptions{ - NodeRequirements: []v1.NodeSelectorRequirement{ - {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-3"}}}, - })) - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-3"}}, + }}, + ))[0] + node := ExpectScheduled(ctx, env.Client, pod) Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-3")) }) It("should not schedule incompatible preferences and requirements with Operator=In", func() { - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(test.PodOptions{ - NodeRequirements: []v1.NodeSelectorRequirement{ - {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"unknown"}}}, - })) - Expect(pods[0].Spec.NodeName).To(BeEmpty()) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"unknown"}}, + }}, + ))[0] + ExpectNotScheduled(ctx, env.Client, pod) }) It("should schedule compatible requirements with Operator=NotIn", func() { - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(test.PodOptions{ - NodeRequirements: []v1.NodeSelectorRequirement{ - {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpNotIn, Values: []string{"test-zone-1", "test-zone-2", "unknown"}}}, - })) - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpNotIn, Values: []string{"test-zone-1", "test-zone-2", "unknown"}}, + }}, + ))[0] + node := ExpectScheduled(ctx, env.Client, pod) Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-3")) }) It("should not schedule incompatible preferences and requirements with Operator=NotIn", func() { - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(test.PodOptions{ + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{ NodeRequirements: []v1.NodeSelectorRequirement{ - {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpNotIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3", "unknown"}}}, - })) - Expect(pods[0].Spec.NodeName).To(BeEmpty()) + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpNotIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3", "unknown"}}, + }}, + ))[0] + ExpectNotScheduled(ctx, env.Client, pod) }) It("should schedule compatible preferences and requirements with Operator=In", func() { - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(test.PodOptions{ + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{ NodeRequirements: []v1.NodeSelectorRequirement{ {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3", "unknown"}}}, NodePreferences: []v1.NodeSelectorRequirement{ {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-2", "unknown"}}}, - })) - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + }, + ))[0] + node := ExpectScheduled(ctx, env.Client, pod) Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-2")) }) It("should not schedule incompatible preferences and requirements with Operator=In", func() { - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(test.PodOptions{ + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{ NodeRequirements: []v1.NodeSelectorRequirement{ {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3", "unknown"}}}, NodePreferences: []v1.NodeSelectorRequirement{ {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"unknown"}}}, - })) - Expect(pods[0].Spec.NodeName).To(BeEmpty()) + }, + ))[0] + ExpectNotScheduled(ctx, env.Client, pod) }) + It("should schedule compatible preferences and requirements with Operator=NotIn", func() { - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(test.PodOptions{ + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{ NodeRequirements: []v1.NodeSelectorRequirement{ {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3", "unknown"}}}, NodePreferences: []v1.NodeSelectorRequirement{ {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpNotIn, Values: []string{"test-zone-1", "test-zone-3"}}}, - })) - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + }, + ))[0] + node := ExpectScheduled(ctx, env.Client, pod) Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-2")) }) It("should not schedule incompatible preferences and requirements with Operator=NotIn", func() { - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(test.PodOptions{ + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{ NodeRequirements: []v1.NodeSelectorRequirement{ {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3", "unknown"}}}, NodePreferences: []v1.NodeSelectorRequirement{ {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpNotIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3"}}}, - })) - Expect(pods[0].Spec.NodeName).To(BeEmpty()) + }, + ))[0] + ExpectNotScheduled(ctx, env.Client, pod) }) It("should schedule compatible node selectors, preferences and requirements", func() { - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(test.PodOptions{ + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{ NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-3"}, NodeRequirements: []v1.NodeSelectorRequirement{ {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3"}}}, NodePreferences: []v1.NodeSelectorRequirement{ {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3"}}}, - })) - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + }, + ))[0] + node := ExpectScheduled(ctx, env.Client, pod) Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-3")) }) It("should not schedule incompatible node selectors, preferences and requirements", func() { - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(test.PodOptions{ + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{ NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-3"}, NodeRequirements: []v1.NodeSelectorRequirement{ {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-3"}}}, NodePreferences: []v1.NodeSelectorRequirement{ {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpNotIn, Values: []string{"test-zone-2", "test-zone-3"}}}, - })) - Expect(pods[0].Spec.NodeName).To(BeEmpty()) + }, + ))[0] + ExpectNotScheduled(ctx, env.Client, pod) }) It("should combine multidimensional node selectors, preferences and requirements", func() { - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(test.PodOptions{ + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{ NodeSelector: map[string]string{ v1.LabelTopologyZone: "test-zone-3", v1.LabelInstanceTypeStable: "arm-instance-type", @@ -300,15 +279,15 @@ var _ = Describe("Combining Constraints", func() { {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpNotIn, Values: []string{"unnknown"}}, {Key: v1.LabelInstanceTypeStable, Operator: v1.NodeSelectorOpNotIn, Values: []string{"unknown"}}, }, - })) - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + }, + ))[0] + node := ExpectScheduled(ctx, env.Client, pod) Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-3")) Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "arm-instance-type")) }) It("should not combine incompatible multidimensional node selectors, preferences and requirements", func() { - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(test.PodOptions{ + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{ NodeSelector: map[string]string{ v1.LabelTopologyZone: "test-zone-3", v1.LabelInstanceTypeStable: "arm-instance-type", @@ -321,8 +300,9 @@ var _ = Describe("Combining Constraints", func() { {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpNotIn, Values: []string{"test-zone-3"}}, {Key: v1.LabelInstanceTypeStable, Operator: v1.NodeSelectorOpNotIn, Values: []string{"arm-instance-type"}}, }, - })) - Expect(pods[0].Spec.NodeName).To(BeEmpty()) + }, + ))[0] + ExpectNotScheduled(ctx, env.Client, pod) }) }) }) @@ -340,16 +320,13 @@ var _ = Describe("Preferential Fallback", func() { {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"invalid"}}, // Should not be relaxed }}, }}}} - ExpectCreated(env.Client, provisioner) - ExpectCreatedWithStatus(env.Client, pod) // Don't relax - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) - pod = ExpectPodExists(env.Client, pod.Name, pod.Namespace) - Expect(pod.Spec.NodeName).To(BeEmpty()) + pod = ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) + // Don't relax - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) - pod = ExpectPodExists(env.Client, pod.Name, pod.Namespace) - Expect(pod.Spec.NodeName).To(BeEmpty()) + pod = ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) }) It("should relax multiple terms", func() { pod := test.UnschedulablePod() @@ -367,20 +344,15 @@ var _ = Describe("Preferential Fallback", func() { {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-2"}}, // OR operator, never get to this one }}, }}}} - ExpectCreated(env.Client, provisioner) - ExpectCreatedWithStatus(env.Client, pod) // Remove first term - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) - pod = ExpectPodExists(env.Client, pod.Name, pod.Namespace) - Expect(pod.Spec.NodeName).To(BeEmpty()) + pod = ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) // Remove second term - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) - pod = ExpectPodExists(env.Client, pod.Name, pod.Namespace) - Expect(pod.Spec.NodeName).To(BeEmpty()) + pod = ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) // Success - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) - pod = ExpectPodExists(env.Client, pod.Name, pod.Namespace) - node := ExpectNodeExists(env.Client, pod.Spec.NodeName) + pod = ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, pod)[0] + node := ExpectScheduled(ctx, env.Client, pod) Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-1")) }) }) @@ -399,20 +371,15 @@ var _ = Describe("Preferential Fallback", func() { }}, }, }}} - ExpectCreated(env.Client, provisioner) - ExpectCreatedWithStatus(env.Client, pod) // Remove first term - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) - pod = ExpectPodExists(env.Client, pod.Name, pod.Namespace) - Expect(pod.Spec.NodeName).To(BeEmpty()) + pod = ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) // Remove second term - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) - pod = ExpectPodExists(env.Client, pod.Name, pod.Namespace) - Expect(pod.Spec.NodeName).To(BeEmpty()) + pod = ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) // Success - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) - pod = ExpectPodExists(env.Client, pod.Name, pod.Namespace) - ExpectNodeExists(env.Client, pod.Spec.NodeName) + pod = ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, pod)[0] + ExpectScheduled(ctx, env.Client, pod) }) It("should relax to use lighter weights", func() { provisioner.Spec.Requirements = v1alpha5.Requirements{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2"}}} @@ -434,47 +401,46 @@ var _ = Describe("Preferential Fallback", func() { }}, }, }}} - ExpectCreated(env.Client, provisioner) - ExpectCreatedWithStatus(env.Client, pod) // Remove heaviest term - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) - pod = ExpectPodExists(env.Client, pod.Name, pod.Namespace) - Expect(pod.Spec.NodeName).To(BeEmpty()) + pod = ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) // Success - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) - pod = ExpectPodExists(env.Client, pod.Name, pod.Namespace) - node := ExpectNodeExists(env.Client, pod.Spec.NodeName) + pod = ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, pod)[0] + node := ExpectScheduled(ctx, env.Client, pod) Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-2")) }) }) }) var _ = Describe("Topology", func() { + + BeforeEach(func() { + provisioning.MinBatchDuration = 1 * time.Second + }) + labels := map[string]string{"test": "test"} It("should ignore unknown topology keys", func() { - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(test.PodOptions{Labels: labels, TopologySpreadConstraints: []v1.TopologySpreadConstraint{{ + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{Labels: labels, TopologySpreadConstraints: []v1.TopologySpreadConstraint{{ TopologyKey: "unknown", WhenUnsatisfiable: v1.DoNotSchedule, LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, MaxSkew: 1, - }}}), - ) - Expect(pods[0].Spec.NodeName).To(BeEmpty()) + }}}, + ))[0] + ExpectNotScheduled(ctx, env.Client, pod) }) Context("Zonal", func() { It("should balance pods across zones", func() { - ExpectCreated(env.Client, provisioner) topology := []v1.TopologySpreadConstraint{{ TopologyKey: v1.LabelTopologyZone, WhenUnsatisfiable: v1.DoNotSchedule, LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, MaxSkew: 1, }} - ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(test.PodOptions{Labels: labels, TopologySpreadConstraints: topology}), test.UnschedulablePod(test.PodOptions{Labels: labels, TopologySpreadConstraints: topology}), test.UnschedulablePod(test.PodOptions{Labels: labels, TopologySpreadConstraints: topology}), @@ -484,14 +450,13 @@ var _ = Describe("Topology", func() { }) It("should respect provisioner zonal constraints", func() { provisioner.Spec.Requirements = v1alpha5.Requirements{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2"}}} - ExpectCreated(env.Client, provisioner) topology := []v1.TopologySpreadConstraint{{ TopologyKey: v1.LabelTopologyZone, WhenUnsatisfiable: v1.DoNotSchedule, LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, MaxSkew: 1, }} - ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(test.PodOptions{Labels: labels, TopologySpreadConstraints: topology}), test.UnschedulablePod(test.PodOptions{Labels: labels, TopologySpreadConstraints: topology}), test.UnschedulablePod(test.PodOptions{Labels: labels, TopologySpreadConstraints: topology}), @@ -510,7 +475,7 @@ var _ = Describe("Topology", func() { LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, MaxSkew: 1, }} - ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.Pod(test.PodOptions{Labels: labels}), // ignored, pending test.Pod(test.PodOptions{}), // ignored, missing labels test.Pod(test.PodOptions{Labels: labels, NodeName: firstNode.Name}), @@ -528,14 +493,13 @@ var _ = Describe("Topology", func() { Context("Hostname", func() { It("should balance pods across nodes", func() { - ExpectCreated(env.Client, provisioner) topology := []v1.TopologySpreadConstraint{{ TopologyKey: v1.LabelHostname, WhenUnsatisfiable: v1.DoNotSchedule, LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, MaxSkew: 1, }} - ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(test.PodOptions{Labels: labels, TopologySpreadConstraints: topology}), test.UnschedulablePod(test.PodOptions{Labels: labels, TopologySpreadConstraints: topology}), test.UnschedulablePod(test.PodOptions{Labels: labels, TopologySpreadConstraints: topology}), @@ -544,14 +508,13 @@ var _ = Describe("Topology", func() { ExpectSkew(env.Client, v1.LabelHostname).To(ConsistOf(1, 1, 1, 1)) }) It("should balance pods on the same hostname up to maxskew", func() { - ExpectCreated(env.Client, provisioner) topology := []v1.TopologySpreadConstraint{{ TopologyKey: v1.LabelHostname, WhenUnsatisfiable: v1.DoNotSchedule, LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, MaxSkew: 4, }} - ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(test.PodOptions{Labels: labels, TopologySpreadConstraints: topology}), test.UnschedulablePod(test.PodOptions{Labels: labels, TopologySpreadConstraints: topology}), test.UnschedulablePod(test.PodOptions{Labels: labels, TopologySpreadConstraints: topology}), @@ -563,7 +526,6 @@ var _ = Describe("Topology", func() { Context("Combined Hostname and Zonal Topology", func() { It("should spread pods while respecting both constraints", func() { - ExpectCreated(env.Client, provisioner) topology := []v1.TopologySpreadConstraint{{ TopologyKey: v1.LabelTopologyZone, WhenUnsatisfiable: v1.DoNotSchedule, @@ -575,25 +537,25 @@ var _ = Describe("Topology", func() { LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, MaxSkew: 3, }} - ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, MakePods(2, test.PodOptions{Labels: labels, TopologySpreadConstraints: topology})..., ) ExpectSkew(env.Client, v1.LabelTopologyZone).To(ConsistOf(1, 1)) ExpectSkew(env.Client, v1.LabelHostname).ToNot(ContainElements(BeNumerically(">", 3))) - ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, MakePods(3, test.PodOptions{Labels: labels, TopologySpreadConstraints: topology})..., ) ExpectSkew(env.Client, v1.LabelTopologyZone).To(ConsistOf(2, 2, 1)) ExpectSkew(env.Client, v1.LabelHostname).ToNot(ContainElements(BeNumerically(">", 3))) - ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, MakePods(5, test.PodOptions{Labels: labels, TopologySpreadConstraints: topology})..., ) ExpectSkew(env.Client, v1.LabelTopologyZone).To(ConsistOf(4, 3, 3)) ExpectSkew(env.Client, v1.LabelHostname).ToNot(ContainElements(BeNumerically(">", 3))) - ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, MakePods(11, test.PodOptions{Labels: labels, TopologySpreadConstraints: topology})..., ) ExpectSkew(env.Client, v1.LabelTopologyZone).To(ConsistOf(7, 7, 7)) @@ -604,14 +566,13 @@ var _ = Describe("Topology", func() { // https://kubernetes.io/docs/concepts/workloads/pods/pod-topology-spread-constraints/#interaction-with-node-affinity-and-node-selectors Context("Combined Zonal Topology and Affinity", func() { It("should limit spread options by nodeSelector", func() { - ExpectCreated(env.Client, provisioner) topology := []v1.TopologySpreadConstraint{{ TopologyKey: v1.LabelTopologyZone, WhenUnsatisfiable: v1.DoNotSchedule, LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, MaxSkew: 1, }} - ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, append( MakePods(5, test.PodOptions{ Labels: labels, @@ -628,14 +589,13 @@ var _ = Describe("Topology", func() { ExpectSkew(env.Client, v1.LabelTopologyZone).To(ConsistOf(5, 5)) }) It("should limit spread options by node affinity", func() { - ExpectCreated(env.Client, provisioner) topology := []v1.TopologySpreadConstraint{{ TopologyKey: v1.LabelTopologyZone, WhenUnsatisfiable: v1.DoNotSchedule, LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, MaxSkew: 1, }} - ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, append( + ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, append( MakePods(6, test.PodOptions{ Labels: labels, TopologySpreadConstraints: topology, @@ -652,7 +612,7 @@ var _ = Describe("Topology", func() { })..., )...) ExpectSkew(env.Client, v1.LabelTopologyZone).To(ConsistOf(4, 3)) - ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, MakePods(5, test.PodOptions{ Labels: labels, TopologySpreadConstraints: topology, @@ -666,59 +626,42 @@ var _ = Describe("Topology", func() { var _ = Describe("Taints", func() { It("should taint nodes with provisioner taints", func() { provisioner.Spec.Taints = []v1.Taint{{Key: "test", Value: "bar", Effect: v1.TaintEffectNoSchedule}} - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(test.PodOptions{Tolerations: []v1.Toleration{{Effect: v1.TaintEffectNoSchedule, Operator: v1.TolerationOpExists}}})) - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod( + test.PodOptions{Tolerations: []v1.Toleration{{Effect: v1.TaintEffectNoSchedule, Operator: v1.TolerationOpExists}}}, + ))[0] + node := ExpectScheduled(ctx, env.Client, pod) Expect(node.Spec.Taints).To(ContainElement(provisioner.Spec.Taints[0])) }) It("should schedule pods that tolerate provisioner constraints", func() { provisioner.Spec.Taints = []v1.Taint{{Key: "test-key", Value: "test-value", Effect: v1.TaintEffectNoSchedule}} - schedulable := []client.Object{ + for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, // Tolerates with OpExists test.UnschedulablePod(test.PodOptions{Tolerations: []v1.Toleration{{Key: "test-key", Operator: v1.TolerationOpExists, Effect: v1.TaintEffectNoSchedule}}}), // Tolerates with OpEqual test.UnschedulablePod(test.PodOptions{Tolerations: []v1.Toleration{{Key: "test-key", Value: "test-value", Operator: v1.TolerationOpEqual, Effect: v1.TaintEffectNoSchedule}}}), + ) { + ExpectScheduled(ctx, env.Client, pod) } - unschedulable := []client.Object{ + for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, // Missing toleration test.UnschedulablePod(), // key mismatch with OpExists test.UnschedulablePod(test.PodOptions{Tolerations: []v1.Toleration{{Key: "invalid", Operator: v1.TolerationOpExists}}}), // value mismatch test.UnschedulablePod(test.PodOptions{Tolerations: []v1.Toleration{{Key: "test-key", Operator: v1.TolerationOpEqual, Effect: v1.TaintEffectNoSchedule}}}), - } - ExpectCreated(env.Client, provisioner) - ExpectCreatedWithStatus(env.Client, schedulable...) - ExpectCreatedWithStatus(env.Client, unschedulable...) - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) - - nodes := &v1.NodeList{} - Expect(env.Client.List(ctx, nodes)).To(Succeed()) - Expect(len(nodes.Items)).To(Equal(1)) - Expect(nodes.Items[0].Spec.Taints[0]).To(Equal(provisioner.Spec.Taints[0])) - for _, pod := range schedulable { - scheduled := ExpectPodExists(env.Client, pod.GetName(), pod.GetNamespace()) - ExpectNodeExists(env.Client, scheduled.Spec.NodeName) - } - for _, pod := range unschedulable { - unscheduled := ExpectPodExists(env.Client, pod.GetName(), pod.GetNamespace()) - Expect(unscheduled.Spec.NodeName).To(BeEmpty()) + ) { + ExpectNotScheduled(ctx, env.Client, pod) } }) It("should not generate taints for OpExists", func() { - pods := []client.Object{ + pod := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(test.PodOptions{Tolerations: []v1.Toleration{{Key: "test-key", Operator: v1.TolerationOpExists, Effect: v1.TaintEffectNoExecute}}}), - } - ExpectCreated(env.Client, provisioner) - ExpectCreatedWithStatus(env.Client, pods...) - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) - nodes := &v1.NodeList{} - Expect(env.Client.List(ctx, nodes)).To(Succeed()) - Expect(len(nodes.Items)).To(Equal(1)) + )[0] + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Spec.Taints).To(HaveLen(2)) // Expect no taints generated beyond defaults }) It("should generate taints for pod tolerations", func() { - pods := []client.Object{ + pods := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, // Matching pods schedule together on a node with a matching taint test.UnschedulablePod(test.PodOptions{Tolerations: []v1.Toleration{ {Key: "test-key", Operator: v1.TolerationOpEqual, Value: "test-value", Effect: v1.TaintEffectNoSchedule}}, @@ -744,15 +687,7 @@ var _ = Describe("Taints", func() { }), // // No taint generated test.UnschedulablePod(test.PodOptions{Tolerations: []v1.Toleration{{Key: "test-key", Operator: v1.TolerationOpExists, Effect: v1.TaintEffectNoExecute}}}), - } - ExpectCreated(env.Client, provisioner) - ExpectCreatedWithStatus(env.Client, pods...) - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) - - nodes := &v1.NodeList{} - Expect(env.Client.List(ctx, nodes)).To(Succeed()) - Expect(len(nodes.Items)).To(Equal(6)) - + ) for i, expectedTaintsPerNode := range [][]v1.Taint{ {{Key: "test-key", Value: "test-value", Effect: v1.TaintEffectNoSchedule}}, {{Key: "test-key", Value: "test-value", Effect: v1.TaintEffectNoSchedule}}, @@ -761,15 +696,12 @@ var _ = Describe("Taints", func() { {{Key: "test-key", Value: "test-value", Effect: v1.TaintEffectNoExecute}}, {{Key: "test-key", Value: "test-value", Effect: v1.TaintEffectNoSchedule}, {Key: "test-key", Value: "test-value", Effect: v1.TaintEffectNoExecute}}, } { - pod := ExpectPodExists(env.Client, pods[i].GetName(), pods[i].GetNamespace()) - node := ExpectNodeExists(env.Client, pod.Spec.NodeName) + node := ExpectScheduled(ctx, env.Client, pods[i]) for _, taint := range expectedTaintsPerNode { Expect(node.Spec.Taints).To(ContainElement(taint)) } } - - pod := ExpectPodExists(env.Client, pods[len(pods)-1].GetName(), pods[len(pods)-1].GetNamespace()) - node := ExpectNodeExists(env.Client, pod.Spec.NodeName) + node := ExpectScheduled(ctx, env.Client, pods[len(pods)-1]) Expect(node.Spec.Taints).To(HaveLen(2)) // Expect no taints generated beyond defaults }) }) diff --git a/pkg/controllers/allocation/scheduling/topology.go b/pkg/controllers/provisioning/scheduling/topology.go similarity index 100% rename from pkg/controllers/allocation/scheduling/topology.go rename to pkg/controllers/provisioning/scheduling/topology.go diff --git a/pkg/controllers/allocation/scheduling/topologygroup.go b/pkg/controllers/provisioning/scheduling/topologygroup.go similarity index 100% rename from pkg/controllers/allocation/scheduling/topologygroup.go rename to pkg/controllers/provisioning/scheduling/topologygroup.go diff --git a/pkg/controllers/allocation/suite_test.go b/pkg/controllers/provisioning/suite_test.go similarity index 54% rename from pkg/controllers/allocation/suite_test.go rename to pkg/controllers/provisioning/suite_test.go index 8ac4faa6a46c..e995eb8c516d 100644 --- a/pkg/controllers/allocation/suite_test.go +++ b/pkg/controllers/provisioning/suite_test.go @@ -12,22 +12,19 @@ See the License for the specific language governing permissions and limitations under the License. */ -package allocation_test +package provisioning_test import ( "context" "testing" - "time" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/awslabs/karpenter/pkg/cloudprovider/fake" "github.com/awslabs/karpenter/pkg/cloudprovider/registry" - "github.com/awslabs/karpenter/pkg/controllers/allocation" - "github.com/awslabs/karpenter/pkg/controllers/allocation/binpacking" - "github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling" + "github.com/awslabs/karpenter/pkg/controllers/provisioning" "github.com/awslabs/karpenter/pkg/test" - "github.com/awslabs/karpenter/pkg/utils/resources" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -38,11 +35,11 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" . "knative.dev/pkg/logging/testing" - "sigs.k8s.io/controller-runtime/pkg/client" ) var ctx context.Context -var controller *allocation.Controller +var controller *provisioning.Controller +var scheduler *provisioning.Scheduler var env *test.Environment func TestAPIs(t *testing.T) { @@ -53,21 +50,11 @@ func TestAPIs(t *testing.T) { var _ = BeforeSuite(func() { env = test.NewEnvironment(ctx, func(e *test.Environment) { + provisioning.MinBatchDuration = 0 // Don't wait for batches when testing cloudProvider := &fake.CloudProvider{} registry.RegisterOrDie(ctx, cloudProvider) - controller = &allocation.Controller{ - CloudProvider: cloudProvider, - Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond), - Filter: &allocation.Filter{KubeClient: e.Client}, - Scheduler: scheduling.NewScheduler(e.Client, cloudProvider), - Launcher: &allocation.Launcher{ - Packer: &binpacking.Packer{}, - KubeClient: e.Client, - CoreV1Client: corev1.NewForConfigOrDie(e.Config), - CloudProvider: cloudProvider, - }, - KubeClient: e.Client, - } + controller = provisioning.NewController(ctx, e.Client, corev1.NewForConfigOrDie(e.Config), cloudProvider) + scheduler = provisioning.NewScheduler(e.Client, controller) }) Expect(env.Start()).To(Succeed(), "Failed to start environment") }) @@ -76,7 +63,7 @@ var _ = AfterSuite(func() { Expect(env.Stop()).To(Succeed(), "Failed to stop environment") }) -var _ = Describe("Allocation", func() { +var _ = Describe("Provisioning", func() { var provisioner *v1alpha5.Provisioner BeforeEach(func() { provisioner = &v1alpha5.Provisioner{ @@ -92,20 +79,17 @@ var _ = Describe("Allocation", func() { }) Context("Reconcilation", func() { - It("should provision nodes for unschedulable pods", func() { - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(), test.UnschedulablePod(), - ) + It("should provision nodes", func() { + pods := ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod()) nodes := &v1.NodeList{} Expect(env.Client.List(ctx, nodes)).To(Succeed()) Expect(len(nodes.Items)).To(Equal(1)) for _, pod := range pods { - Expect(pod.Spec.NodeName).To(Equal(nodes.Items[0].Name)) + ExpectScheduled(ctx, env.Client, pod) } }) It("should provision nodes for pods with supported node selectors", func() { - schedulable := []client.Object{ + schedulable := []*v1.Pod{ // Constrained by provisioner test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}), // Constrained by zone @@ -115,7 +99,7 @@ var _ = Describe("Allocation", func() { // Constrained by architecture test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1.LabelArchStable: "arm64"}}), } - unschedulable := []client.Object{ + unschedulable := []*v1.Pod{ // Ignored, matches another provisioner test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1alpha5.ProvisionerNameLabelKey: "unknown"}}), // Ignored, invalid zone @@ -129,25 +113,15 @@ var _ = Describe("Allocation", func() { // Ignored, label selector does not match test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{"foo": "bar"}}), } - ExpectCreated(env.Client, provisioner) - ExpectCreatedWithStatus(env.Client, schedulable...) - ExpectCreatedWithStatus(env.Client, unschedulable...) - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) - - nodes := &v1.NodeList{} - Expect(env.Client.List(ctx, nodes)).To(Succeed()) - for _, pod := range schedulable { - scheduled := ExpectPodExists(env.Client, pod.GetName(), pod.GetNamespace()) - ExpectNodeExists(env.Client, scheduled.Spec.NodeName) + for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, schedulable...) { + ExpectScheduled(ctx, env.Client, pod) } - for _, pod := range unschedulable { - ExpectNotScheduled(env.Client, pod.GetName(), pod.GetNamespace()) + for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, unschedulable...) { + ExpectNotScheduled(ctx, env.Client, pod) } - Expect(len(nodes.Items)).To(Equal(4)) }) It("should provision nodes for accelerators", func() { - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(test.PodOptions{ ResourceRequirements: v1.ResourceRequirements{Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")}}, }), @@ -157,30 +131,23 @@ var _ = Describe("Allocation", func() { test.UnschedulablePod(test.PodOptions{ ResourceRequirements: v1.ResourceRequirements{Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")}}, }), - ) - nodes := &v1.NodeList{} - Expect(env.Client.List(ctx, nodes)).To(Succeed()) - Expect(len(nodes.Items)).To(Equal(3)) - for _, pod := range pods { - scheduled := ExpectPodExists(env.Client, pod.GetName(), pod.GetNamespace()) - ExpectNodeExists(env.Client, scheduled.Spec.NodeName) + ) { + ExpectScheduled(ctx, env.Client, pod) } }) It("should account for daemonsets", func() { - daemonsets := []client.Object{ - &appsv1.DaemonSet{ - ObjectMeta: metav1.ObjectMeta{Name: "daemons", Namespace: "default"}, - Spec: appsv1.DaemonSetSpec{ - Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": "test"}}, - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": "test"}}, - Spec: test.UnschedulablePod(test.PodOptions{ - ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}}, - }).Spec, - }}, - }, - } - schedulable := []client.Object{ + ExpectCreated(env.Client, &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{Name: "daemons", Namespace: "default"}, + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": "test"}}, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": "test"}}, + Spec: test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}}, + }).Spec, + }}, + }) + for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod(test.PodOptions{ ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}}, }), @@ -190,42 +157,32 @@ var _ = Describe("Allocation", func() { test.UnschedulablePod(test.PodOptions{ ResourceRequirements: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")}}, }), + ) { + node := ExpectScheduled(ctx, env.Client, pod) + Expect(*node.Status.Allocatable.Cpu()).To(Equal(resource.MustParse("4"))) + Expect(*node.Status.Allocatable.Memory()).To(Equal(resource.MustParse("4Gi"))) } - ExpectCreated(env.Client, provisioner) - ExpectCreatedWithStatus(env.Client, daemonsets...) - ExpectCreatedWithStatus(env.Client, schedulable...) - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) - - nodes := &v1.NodeList{} - Expect(env.Client.List(ctx, nodes)).To(Succeed()) - Expect(len(nodes.Items)).To(Equal(1)) - for _, pod := range schedulable { - scheduled := ExpectPodExists(env.Client, pod.GetName(), pod.GetNamespace()) - ExpectNodeExists(env.Client, scheduled.Spec.NodeName) - } - Expect(*nodes.Items[0].Status.Allocatable.Cpu()).To(Equal(resource.MustParse("4"))) - Expect(*nodes.Items[0].Status.Allocatable.Memory()).To(Equal(resource.MustParse("4Gi"))) }) - Context("Labels", func() { It("should label nodes", func() { provisioner.Spec.Labels = map[string]string{"test-key": "test-value", "test-key-2": "test-value-2"} - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) - Expect(node.Labels).To(HaveKeyWithValue(v1alpha5.ProvisionerNameLabelKey, provisioner.Name)) - Expect(node.Labels).To(HaveKeyWithValue("test-key", "test-value")) - Expect(node.Labels).To(HaveKeyWithValue("test-key-2", "test-value-2")) - Expect(node.Labels).To(HaveKey(v1.LabelTopologyZone)) - Expect(node.Labels).To(HaveKey(v1.LabelInstanceTypeStable)) + for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod()) { + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(HaveKeyWithValue(v1alpha5.ProvisionerNameLabelKey, provisioner.Name)) + Expect(node.Labels).To(HaveKeyWithValue("test-key", "test-value")) + Expect(node.Labels).To(HaveKeyWithValue("test-key-2", "test-value-2")) + Expect(node.Labels).To(HaveKey(v1.LabelTopologyZone)) + Expect(node.Labels).To(HaveKey(v1.LabelInstanceTypeStable)) + } }) }) Context("Taints", func() { It("should apply unready taints", func() { ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) - Expect(node.Spec.Taints).To(ContainElement(v1.Taint{Key: v1alpha5.NotReadyTaintKey, Effect: v1.TaintEffectNoSchedule})) + for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, controller, provisioner, test.UnschedulablePod()) { + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Spec.Taints).To(ContainElement(v1.Taint{Key: v1alpha5.NotReadyTaintKey, Effect: v1.TaintEffectNoSchedule})) + } }) }) }) diff --git a/pkg/test/expectations/expectations.go b/pkg/test/expectations/expectations.go index 33e820a2ada5..180386af3d17 100644 --- a/pkg/test/expectations/expectations.go +++ b/pkg/test/expectations/expectations.go @@ -17,6 +17,7 @@ package expecations import ( "context" "fmt" + "sync" "time" //nolint:revive,stylecheck @@ -30,7 +31,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" - "github.com/awslabs/karpenter/pkg/utils/pretty" + "github.com/awslabs/karpenter/pkg/controllers/provisioning" ) const ( @@ -60,18 +61,25 @@ func ExpectNotFound(c client.Client, objects ...client.Object) { } } -func ExpectScheduled(c client.Client, name string, namespace string) { - pod := ExpectPodExists(c, name, namespace) - Eventually(pod.Spec.NodeName).ShouldNot(BeEmpty(), func() string { - return fmt.Sprintf("expected pod to scheduled, but it wasn't: %s", pretty.Concise(pod.Spec)) - }) +func ExpectScheduled(ctx context.Context, c client.Client, pod *v1.Pod) *v1.Node { + p := ExpectPodExists(c, pod.Name, pod.Namespace) + Expect(p.Spec.NodeName).ToNot(BeEmpty(), fmt.Sprintf("expected %s/%s to scheduled", pod.Namespace, pod.Name)) + return ExpectNodeExists(c, p.Spec.NodeName) } -func ExpectNotScheduled(c client.Client, name string, namespace string) { - pod := ExpectPodExists(c, name, namespace) - Eventually(pod.Spec.NodeName).Should(BeEmpty(), func() string { - return fmt.Sprintf("expected pod to not be scheduled, but it was: %s", pretty.Concise(pod.Spec)) - }) +func ExpectNotScheduled(ctx context.Context, c client.Client, pod *v1.Pod) { + p := ExpectPodExists(c, pod.Name, pod.Namespace) + Eventually(p.Spec.NodeName).Should(BeEmpty(), fmt.Sprintf("expected %s/%s to not be scheduled", pod.Namespace, pod.Name)) +} + +func ExpectApplied(c client.Client, objects ...client.Object) { + for _, object := range objects { + if object.GetResourceVersion() == "" { + Expect(c.Create(context.Background(), object)).To(Succeed()) + } else { + Expect(c.Update(context.Background(), object)).To(Succeed()) + } + } } func ExpectCreated(c client.Client, objects ...client.Object) { @@ -84,7 +92,7 @@ func ExpectCreatedWithStatus(c client.Client, objects ...client.Object) { for _, object := range objects { // Preserve a copy of the status, which is overriden by create status := object.DeepCopyObject().(client.Object) - ExpectCreated(c, object) + ExpectApplied(c, object) Expect(c.Status().Update(context.Background(), status)).To(Succeed()) } } @@ -127,6 +135,30 @@ func ExpectCleanedUp(c client.Client) { } } +func ExpectProvisioned(ctx context.Context, c client.Client, scheduler *provisioning.Scheduler, controller *provisioning.Controller, provisioner *v1alpha5.Provisioner, pods ...*v1.Pod) (result []*v1.Pod) { + // Persist objects + ExpectApplied(c, provisioner) + for _, pod := range pods { + ExpectCreatedWithStatus(c, pod) + } + // Wait for reconcile + ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) + wg := sync.WaitGroup{} + for _, pod := range pods { + wg.Add(1) + go func(pod *v1.Pod) { + ExpectReconcileSucceeded(ctx, scheduler, client.ObjectKeyFromObject(pod)) + wg.Done() + }(pod) + } + wg.Wait() + // Return updated pods + for _, pod := range pods { + result = append(result, ExpectPodExists(c, pod.GetName(), pod.GetNamespace())) + } + return result +} + func ExpectProvisioningSucceeded(ctx context.Context, c client.Client, reconciler reconcile.Reconciler, provisioner *v1alpha5.Provisioner, pods ...*v1.Pod) []*v1.Pod { for _, pod := range pods { ExpectCreatedWithStatus(c, pod)