diff --git a/pkg/cloudprovider/aws/fake/ec2api.go b/pkg/cloudprovider/aws/fake/ec2api.go index 7bbfe09a8da5..42bea1e3f6e2 100644 --- a/pkg/cloudprovider/aws/fake/ec2api.go +++ b/pkg/cloudprovider/aws/fake/ec2api.go @@ -26,11 +26,13 @@ import ( "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" + "github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" "github.com/aws/karpenter/pkg/utils/functional" set "github.com/deckarep/golang-set" ) type CapacityPool struct { + CapacityType string InstanceType string Zone string } @@ -80,11 +82,18 @@ func (e *EC2API) CreateFleetWithContext(_ context.Context, input *ec2.CreateFlee instances := []*ec2.Instance{} instanceIds := []*string{} skippedPools := []CapacityPool{} + var spotInstanceRequestID *string + + if aws.StringValue(input.TargetCapacitySpecification.DefaultTargetCapacityType) == v1alpha1.CapacityTypeSpot { + spotInstanceRequestID = aws.String(randomdata.SillyName()) + } + for i := 0; i < int(*input.TargetCapacitySpecification.TotalTargetCapacity); i++ { skipInstance := false for _, pool := range e.InsufficientCapacityPools { if pool.InstanceType == aws.StringValue(input.LaunchTemplateConfigs[0].Overrides[0].InstanceType) && - pool.Zone == aws.StringValue(input.LaunchTemplateConfigs[0].Overrides[0].AvailabilityZone) { + pool.Zone == aws.StringValue(input.LaunchTemplateConfigs[0].Overrides[0].AvailabilityZone) && + pool.CapacityType == aws.StringValue(input.TargetCapacitySpecification.DefaultTargetCapacityType) { skippedPools = append(skippedPools, pool) skipInstance = true break @@ -94,10 +103,11 @@ func (e *EC2API) CreateFleetWithContext(_ context.Context, input *ec2.CreateFlee continue } instances = append(instances, &ec2.Instance{ - InstanceId: aws.String(randomdata.SillyName()), - Placement: &ec2.Placement{AvailabilityZone: input.LaunchTemplateConfigs[0].Overrides[0].AvailabilityZone}, - PrivateDnsName: aws.String(randomdata.IpV4Address()), - InstanceType: input.LaunchTemplateConfigs[0].Overrides[0].InstanceType, + InstanceId: aws.String(randomdata.SillyName()), + Placement: &ec2.Placement{AvailabilityZone: input.LaunchTemplateConfigs[0].Overrides[0].AvailabilityZone}, + PrivateDnsName: aws.String(randomdata.IpV4Address()), + InstanceType: input.LaunchTemplateConfigs[0].Overrides[0].InstanceType, + SpotInstanceRequestId: spotInstanceRequestID, }) e.Instances.Store(*instances[i].InstanceId, instances[i]) instanceIds = append(instanceIds, instances[i].InstanceId) diff --git a/pkg/cloudprovider/aws/instance.go b/pkg/cloudprovider/aws/instance.go index e14418ec5670..7108d67ddce2 100644 --- a/pkg/cloudprovider/aws/instance.go +++ b/pkg/cloudprovider/aws/instance.go @@ -106,16 +106,8 @@ func (p *InstanceProvider) Terminate(ctx context.Context, node *v1.Node) error { } func (p *InstanceProvider) launchInstances(ctx context.Context, constraints *v1alpha1.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int) ([]*string, error) { - // Default to on-demand unless constrained otherwise or if flexible to spot and - // on-demand. This code assumes two options: {spot, on-demand}, which is enforced - // by constraints.Constrain(). Spot may be selected by constraining the provisioner, - // or using nodeSelectors, required node affinity, or preferred node affinity. - capacityType := v1alpha1.CapacityTypeOnDemand - if capacityTypes := constraints.Requirements.CapacityTypes(); len(capacityTypes) == 0 { - return nil, fmt.Errorf("invariant violated, must contain at least one capacity type") - } else if len(capacityTypes) == 1 { - capacityType = capacityTypes.UnsortedList()[0] - } + capacityType := p.getCapacityType(constraints, instanceTypes) + // Get Launch Template Configs, which may differ due to GPU or Architecture requirements launchTemplateConfigs, err := p.getLaunchTemplateConfigs(ctx, constraints, instanceTypes, capacityType) if err != nil { @@ -183,7 +175,6 @@ func (p *InstanceProvider) getOverrides(instanceTypeOptions []cloudprovider.Inst var overrides []*ec2.FleetLaunchTemplateOverridesRequest for i, instanceType := range instanceTypeOptions { for _, offering := range instanceType.Offerings() { - // we can't assume that all zones will be available for all capacity types, hence this check if capacityType != offering.CapacityType { continue } @@ -285,6 +276,22 @@ func (p *InstanceProvider) updateUnavailableOfferingsCache(ctx context.Context, } } +// getCapacityType selects spot if both constraints are flexible and there is an +// available offering. The AWS Cloud Provider defaults to [ on-demand ], so spot +// must be explicitly included in capacity type requirements. +func (p *InstanceProvider) getCapacityType(constraints *v1alpha1.Constraints, instanceTypes []cloudprovider.InstanceType) string { + if constraints.Requirements.CapacityTypes().Has(v1alpha1.CapacityTypeSpot) { + for _, instanceType := range instanceTypes { + for _, offering := range instanceType.Offerings() { + if constraints.Requirements.Zones().Has(offering.Zone) && offering.CapacityType == v1alpha1.CapacityTypeSpot { + return v1alpha1.CapacityTypeSpot + } + } + } + } + return v1alpha1.CapacityTypeOnDemand +} + func getInstanceID(node *v1.Node) (*string, error) { id := strings.Split(node.Spec.ProviderID, "/") if len(id) < 5 { @@ -305,11 +312,10 @@ func combineFleetErrors(errors []*ec2.CreateFleetError) (errs error) { } func getCapacityType(instance *ec2.Instance) string { - capacityType := v1alpha1.CapacityTypeOnDemand if instance.SpotInstanceRequestId != nil { - capacityType = v1alpha1.CapacityTypeSpot + return v1alpha1.CapacityTypeSpot } - return capacityType + return v1alpha1.CapacityTypeOnDemand } func combineFleetInstances(createFleetOutput ec2.CreateFleetOutput) []*string { diff --git a/pkg/cloudprovider/aws/instancetypes.go b/pkg/cloudprovider/aws/instancetypes.go index fa3663343d67..7818524d0c58 100644 --- a/pkg/cloudprovider/aws/instancetypes.go +++ b/pkg/cloudprovider/aws/instancetypes.go @@ -179,7 +179,7 @@ func (p *InstanceTypeProvider) CacheUnavailable(ctx context.Context, instanceTyp capacityType, InsufficientCapacityErrorCacheTTL) // even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL - p.unavailableOfferings.SetDefault(UnavailableOfferingsCacheKey(capacityType, instanceType, zone), sets.Empty{}) + p.unavailableOfferings.SetDefault(UnavailableOfferingsCacheKey(capacityType, instanceType, zone), struct{}{}) } func UnavailableOfferingsCacheKey(capacityType string, instanceType string, zone string) string { diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index 3d39e1d87456..3592894c4a6b 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -183,7 +183,7 @@ var _ = Describe("Allocation", func() { }) Context("Insufficient Capacity Error Cache", func() { It("should launch instances of different type on second reconciliation attempt with Insufficient Capacity Error Cache fallback", func() { - fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{InstanceType: "inf1.6xlarge", Zone: "test-zone-1a"}} + fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{CapacityType: v1alpha1.CapacityTypeOnDemand, InstanceType: "inf1.6xlarge", Zone: "test-zone-1a"}} pods := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod(test.PodOptions{ NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-1a"}, @@ -213,7 +213,7 @@ var _ = Describe("Allocation", func() { Expect(nodeNames.Len()).To(Equal(2)) }) It("should launch instances in a different zone on second reconciliation attempt with Insufficient Capacity Error Cache fallback", func() { - fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{InstanceType: "p3.8xlarge", Zone: "test-zone-1a"}} + fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{CapacityType: v1alpha1.CapacityTypeOnDemand, InstanceType: "p3.8xlarge", Zone: "test-zone-1a"}} pod := test.UnschedulablePod(test.PodOptions{ ResourceRequirements: v1.ResourceRequirements{ Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")}, @@ -238,7 +238,7 @@ var _ = Describe("Allocation", func() { HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-1b"))) }) It("should launch instances on later reconciliation attempt with Insufficient Capacity Error Cache expiry", func() { - fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{InstanceType: "inf1.6xlarge", Zone: "test-zone-1a"}} + fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{CapacityType: v1alpha1.CapacityTypeOnDemand, InstanceType: "inf1.6xlarge", Zone: "test-zone-1a"}} pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod(test.PodOptions{ NodeSelector: map[string]string{v1.LabelInstanceTypeStable: "inf1.6xlarge"}, @@ -256,26 +256,33 @@ var _ = Describe("Allocation", func() { node := ExpectScheduled(ctx, env.Client, pod) Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "inf1.6xlarge")) }) + It("should launch on-demand capacity if flexible to both spot and on demand, but spot if unavailable", func() { + fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{CapacityType: v1alpha1.CapacityTypeSpot, InstanceType: "m5.large", Zone: "test-zone-1a"}} + provisioner.Spec.Requirements = v1alpha5.Requirements{ + {Key: v1alpha5.LabelCapacityType, Operator: v1.NodeSelectorOpIn, Values: []string{v1alpha1.CapacityTypeSpot, v1alpha1.CapacityTypeOnDemand}}, + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}}, + {Key: v1.LabelInstanceTypeStable, Operator: v1.NodeSelectorOpIn, Values: []string{"m5.large"}}, + } + // Spot Unavailable + pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod())[0] + ExpectNotScheduled(ctx, env.Client, pod) + // Fallback to OD + pod = ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod())[0] + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(HaveKeyWithValue(v1alpha5.LabelCapacityType, v1alpha1.CapacityTypeOnDemand)) + }) }) Context("CapacityType", func() { It("should default to on demand", func() { pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod())[0] - ExpectScheduled(ctx, env.Client, pod) - Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1)) - input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput) - Expect(input.LaunchTemplateConfigs).To(HaveLen(1)) - Expect(*input.TargetCapacitySpecification.DefaultTargetCapacityType).To(Equal(v1alpha1.CapacityTypeOnDemand)) + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(HaveKeyWithValue(v1alpha5.LabelCapacityType, v1alpha1.CapacityTypeOnDemand)) }) It("should launch spot capacity if flexible to both spot and on demand", func() { provisioner.Spec.Requirements = v1alpha5.Requirements{{Key: v1alpha5.LabelCapacityType, Operator: v1.NodeSelectorOpIn, Values: []string{v1alpha1.CapacityTypeSpot, v1alpha1.CapacityTypeOnDemand}}} - pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, - test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1alpha5.LabelCapacityType: v1alpha1.CapacityTypeSpot}}), - )[0] - ExpectScheduled(ctx, env.Client, pod) - Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1)) - input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput) - Expect(input.LaunchTemplateConfigs).To(HaveLen(1)) - Expect(*input.TargetCapacitySpecification.DefaultTargetCapacityType).To(Equal(v1alpha1.CapacityTypeSpot)) + pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod())[0] + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(HaveKeyWithValue(v1alpha5.LabelCapacityType, v1alpha1.CapacityTypeSpot)) }) }) Context("LaunchTemplates", func() { diff --git a/pkg/controllers/scheduling/controller.go b/pkg/controllers/scheduling/controller.go index f2f382b1804f..f34db1c73c58 100644 --- a/pkg/controllers/scheduling/controller.go +++ b/pkg/controllers/scheduling/controller.go @@ -76,7 +76,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco return reconcile.Result{}, err } provisioner.Add(ctx, pod) - return reconcile.Result{RequeueAfter: time.Second * 5}, nil + return reconcile.Result{RequeueAfter: time.Second * 1}, nil } func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (*provisioning.Provisioner, error) {