From 74c26010e01946960999c0e58c335842f05ed59f Mon Sep 17 00:00:00 2001 From: Ellis Tarn Date: Fri, 25 Mar 2022 12:56:28 -0700 Subject: [PATCH] Simplified Cloudprovider Create API and delgate batching the provider specific implementation (#1575) --- pkg/cloudprovider/aws/cloudprovider.go | 43 +----- pkg/cloudprovider/aws/instance.go | 126 ++++++++---------- pkg/cloudprovider/aws/instancetypes.go | 4 + pkg/cloudprovider/aws/launchtemplate.go | 6 +- pkg/cloudprovider/aws/securitygroups.go | 4 + pkg/cloudprovider/aws/subnets.go | 4 + pkg/cloudprovider/fake/cloudprovider.go | 73 +++++----- pkg/cloudprovider/metrics/cloudprovider.go | 4 +- pkg/cloudprovider/types.go | 15 ++- pkg/controllers/provisioning/provisioner.go | 34 +++-- .../scheduling/instance_selection_test.go | 57 ++++---- .../provisioning/scheduling/suite_test.go | 23 +--- 12 files changed, 168 insertions(+), 225 deletions(-) diff --git a/pkg/cloudprovider/aws/cloudprovider.go b/pkg/cloudprovider/aws/cloudprovider.go index 896b2c0d2d4a..594a396f50ae 100644 --- a/pkg/cloudprovider/aws/cloudprovider.go +++ b/pkg/cloudprovider/aws/cloudprovider.go @@ -18,8 +18,6 @@ import ( "fmt" "time" - "github.com/aws/karpenter/pkg/utils/resources" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/client" "github.com/aws/aws-sdk-go/aws/ec2metadata" @@ -38,7 +36,6 @@ import ( "github.com/aws/karpenter/pkg/utils/injection" "github.com/aws/karpenter/pkg/utils/project" - "go.uber.org/multierr" v1 "k8s.io/api/core/v1" "k8s.io/client-go/transport" "knative.dev/pkg/apis" @@ -101,24 +98,12 @@ func NewCloudProvider(ctx context.Context, options cloudprovider.Options) *Cloud } // Create a node given the constraints. -func (c *CloudProvider) Create(ctx context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, callback func(*v1.Node) error) error { - vendorConstraints, err := v1alpha1.Deserialize(constraints) - if err != nil { - return err - } - instanceTypes = c.filterInstanceTypes(instanceTypes) - - // Create will only return an error if zero nodes could be launched. - // Partial fulfillment will be logged - nodes, err := c.instanceProvider.Create(ctx, vendorConstraints, instanceTypes, quantity) +func (c *CloudProvider) Create(ctx context.Context, nodeRequest *cloudprovider.NodeRequest) (*v1.Node, error) { + vendorConstraints, err := v1alpha1.Deserialize(nodeRequest.Constraints) if err != nil { - return fmt.Errorf("launching instances, %w", err) + return nil, err } - var errs error - for _, node := range nodes { - errs = multierr.Append(errs, callback(node)) - } - return errs + return c.instanceProvider.Create(ctx, vendorConstraints, nodeRequest.InstanceTypeOptions) } // GetInstanceTypes returns all available InstanceTypes despite accepting a Constraints struct (note that it does not utilize Requirements) @@ -161,26 +146,6 @@ func (c *CloudProvider) Name() string { return "aws" } -// filterInstanceTypes is used to eliminate GPU instance types from the list of possible instance types when a -// non-GPU instance type will work. If the list of instance types consists of both GPU and non-GPU types, then only -// the non-GPU types will be returned. If it has only GPU types, the list will be returned unaltered. -func (c *CloudProvider) filterInstanceTypes(instanceTypes []cloudprovider.InstanceType) []cloudprovider.InstanceType { - var genericInstanceTypes []cloudprovider.InstanceType - for _, it := range instanceTypes { - itRes := it.Resources() - if resources.IsZero(itRes[v1alpha1.ResourceAWSNeuron]) && - resources.IsZero(itRes[v1alpha1.ResourceAMDGPU]) && - resources.IsZero(itRes[v1alpha1.ResourceNVIDIAGPU]) { - genericInstanceTypes = append(genericInstanceTypes, it) - } - } - // if we got some subset of non-GPU types, then prefer to use those - if len(genericInstanceTypes) != 0 { - return genericInstanceTypes - } - return instanceTypes -} - // get the current region from EC2 IMDS func getRegionFromIMDS(sess *session.Session) string { region, err := ec2metadata.New(sess).Region() diff --git a/pkg/cloudprovider/aws/instance.go b/pkg/cloudprovider/aws/instance.go index b6be7013c274..656bf1c57237 100644 --- a/pkg/cloudprovider/aws/instance.go +++ b/pkg/cloudprovider/aws/instance.go @@ -37,6 +37,7 @@ import ( "github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" "github.com/aws/karpenter/pkg/utils/injection" "github.com/aws/karpenter/pkg/utils/options" + "github.com/aws/karpenter/pkg/utils/resources" ) const ( @@ -68,45 +69,33 @@ func NewInstanceProvider(ec2api ec2iface.EC2API, instanceTypeProvider *InstanceT // instanceTypes should be sorted by priority for spot capacity type. // If spot is not used, the instanceTypes are not required to be sorted // because we are using ec2 fleet's lowest-price OD allocation strategy -func (p *InstanceProvider) Create(ctx context.Context, constraints *v1alpha1.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int) ([]*v1.Node, error) { +func (p *InstanceProvider) Create(ctx context.Context, constraints *v1alpha1.Constraints, instanceTypes []cloudprovider.InstanceType) (*v1.Node, error) { // Launch Instance - ids, err := p.launchInstances(ctx, constraints, instanceTypes, quantity) + instanceTypes = p.filterInstanceTypes(instanceTypes) + id, err := p.launchInstance(ctx, constraints, instanceTypes) if err != nil { return nil, err } // Get Instance with backoff retry since EC2 is eventually consistent - instances := []*ec2.Instance{} + instance := &ec2.Instance{} if err := retry.Do( - func() (err error) { instances, err = p.getInstances(ctx, ids); return err }, + func() (err error) { instance, err = p.getInstance(ctx, aws.StringValue(id)); return err }, retry.Delay(1*time.Second), retry.Attempts(6), - ); err != nil && len(instances) == 0 { + ); err != nil { return nil, err } else if err != nil { - logging.FromContext(ctx).Errorf("retrieving node name for %d/%d instances", quantity-len(instances), quantity) - } - - nodes := []*v1.Node{} - for _, instance := range instances { - logging.FromContext(ctx).Infof("Launched instance: %s, hostname: %s, type: %s, zone: %s, capacityType: %s", - aws.StringValue(instance.InstanceId), - aws.StringValue(instance.PrivateDnsName), - aws.StringValue(instance.InstanceType), - aws.StringValue(instance.Placement.AvailabilityZone), - getCapacityType(instance), - ) - // Convert Instance to Node - node, err := p.instanceToNode(ctx, instance, instanceTypes) - if err != nil { - logging.FromContext(ctx).Errorf("creating Node from an EC2 Instance: %s", err) - continue - } - nodes = append(nodes, node) - } - if len(nodes) == 0 { - return nil, fmt.Errorf("zero nodes were created") - } - return nodes, nil + logging.FromContext(ctx).Errorf("retrieving node name for instance %s", aws.StringValue(instance.InstanceId)) + } + logging.FromContext(ctx).Infof("Launched instance: %s, hostname: %s, type: %s, zone: %s, capacityType: %s", + aws.StringValue(instance.InstanceId), + aws.StringValue(instance.PrivateDnsName), + aws.StringValue(instance.InstanceType), + aws.StringValue(instance.Placement.AvailabilityZone), + getCapacityType(instance), + ) + // Convert Instance to Node + return p.instanceToNode(ctx, instance, instanceTypes), nil } func (p *InstanceProvider) Terminate(ctx context.Context, node *v1.Node) error { @@ -125,9 +114,8 @@ func (p *InstanceProvider) Terminate(ctx context.Context, node *v1.Node) error { return nil } -func (p *InstanceProvider) launchInstances(ctx context.Context, constraints *v1alpha1.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int) ([]*string, error) { +func (p *InstanceProvider) launchInstance(ctx context.Context, constraints *v1alpha1.Constraints, instanceTypes []cloudprovider.InstanceType) (*string, error) { capacityType := p.getCapacityType(constraints, instanceTypes) - // Get Launch Template Configs, which may differ due to GPU or Architecture requirements launchTemplateConfigs, err := p.getLaunchTemplateConfigs(ctx, constraints, instanceTypes, capacityType) if err != nil { @@ -140,7 +128,7 @@ func (p *InstanceProvider) launchInstances(ctx context.Context, constraints *v1a LaunchTemplateConfigs: launchTemplateConfigs, TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{ DefaultTargetCapacityType: aws.String(capacityType), - TotalTargetCapacity: aws.Int64(int64(quantity)), + TotalTargetCapacity: aws.Int64(1), }, TagSpecifications: []*ec2.TagSpecification{ {ResourceType: aws.String(ec2.ResourceTypeInstance), Tags: tags}, @@ -157,14 +145,10 @@ func (p *InstanceProvider) launchInstances(ctx context.Context, constraints *v1a return nil, fmt.Errorf("creating fleet %w", err) } p.updateUnavailableOfferingsCache(ctx, createFleetOutput.Errors, capacityType) - instanceIds := combineFleetInstances(*createFleetOutput) - if len(instanceIds) == 0 { + if len(createFleetOutput.Instances) == 0 || len(createFleetOutput.Instances[0].InstanceIds) == 0 { return nil, combineFleetErrors(createFleetOutput.Errors) - } else if len(instanceIds) != quantity { - logging.FromContext(ctx).Errorf("Failed to launch %d EC2 instances out of the %d EC2 instances requested: %s", - quantity-len(instanceIds), quantity, combineFleetErrors(createFleetOutput.Errors).Error()) } - return instanceIds, nil + return createFleetOutput.Instances[0].InstanceIds[0], nil } func (p *InstanceProvider) getLaunchTemplateConfigs(ctx context.Context, constraints *v1alpha1.Constraints, instanceTypes []cloudprovider.InstanceType, capacityType string) ([]*ec2.FleetLaunchTemplateConfigRequest, error) { @@ -239,34 +223,28 @@ func (p *InstanceProvider) getOverrides(instanceTypeOptions []cloudprovider.Inst return overrides } -func (p *InstanceProvider) getInstances(ctx context.Context, ids []*string) ([]*ec2.Instance, error) { - describeInstancesOutput, err := p.ec2api.DescribeInstancesWithContext(ctx, &ec2.DescribeInstancesInput{InstanceIds: ids}) +func (p *InstanceProvider) getInstance(ctx context.Context, id string) (*ec2.Instance, error) { + describeInstancesOutput, err := p.ec2api.DescribeInstancesWithContext(ctx, &ec2.DescribeInstancesInput{InstanceIds: aws.StringSlice([]string{id})}) if isNotFound(err) { return nil, err } if err != nil { return nil, fmt.Errorf("failed to describe ec2 instances, %w", err) } - describedInstances := combineReservations(describeInstancesOutput.Reservations) - if len(describedInstances) != len(ids) { - return nil, fmt.Errorf("expected %d instance(s), but got %d", len(ids), len(describedInstances)) + if len(describeInstancesOutput.Reservations) != 1 || len(describeInstancesOutput.Reservations[0].Instances) != 1 { + return nil, fmt.Errorf("expected instance but got 0") } + instance := describeInstancesOutput.Reservations[0].Instances[0] if injection.GetOptions(ctx).GetAWSNodeNameConvention() == options.ResourceName { - return describedInstances, nil + return instance, nil } - - instances := []*ec2.Instance{} - for _, instance := range describedInstances { - if len(aws.StringValue(instance.PrivateDnsName)) == 0 { - err = multierr.Append(err, fmt.Errorf("got instance %s but PrivateDnsName was not set", aws.StringValue(instance.InstanceId))) - continue - } - instances = append(instances, instance) + if len(aws.StringValue(instance.PrivateDnsName)) == 0 { + return nil, multierr.Append(err, fmt.Errorf("got instance %s but PrivateDnsName was not set", aws.StringValue(instance.InstanceId))) } - return instances, err + return instance, nil } -func (p *InstanceProvider) instanceToNode(ctx context.Context, instance *ec2.Instance, instanceTypes []cloudprovider.InstanceType) (*v1.Node, error) { +func (p *InstanceProvider) instanceToNode(ctx context.Context, instance *ec2.Instance, instanceTypes []cloudprovider.InstanceType) *v1.Node { for _, instanceType := range instanceTypes { if instanceType.Name() == aws.StringValue(instance.InstanceType) { nodeName := strings.ToLower(aws.StringValue(instance.PrivateDnsName)) @@ -310,10 +288,10 @@ func (p *InstanceProvider) instanceToNode(ctx context.Context, instance *ec2.Ins OperatingSystem: v1alpha5.OperatingSystemLinux, }, }, - }, nil + } } } - return nil, fmt.Errorf("unrecognized instance type %s", aws.StringValue(instance.InstanceType)) + panic(fmt.Sprintf("unrecognized instance type %s", aws.StringValue(instance.InstanceType))) } func (p *InstanceProvider) updateUnavailableOfferingsCache(ctx context.Context, errors []*ec2.CreateFleetError, capacityType string) { @@ -340,6 +318,26 @@ func (p *InstanceProvider) getCapacityType(constraints *v1alpha1.Constraints, in return v1alpha1.CapacityTypeOnDemand } +// filterInstanceTypes is used to eliminate GPU instance types from the list of possible instance types when a +// non-GPU instance type will work. If the list of instance types consists of both GPU and non-GPU types, then only +// the non-GPU types will be returned. If it has only GPU types, the list will be returned unaltered. +func (p *InstanceProvider) filterInstanceTypes(instanceTypes []cloudprovider.InstanceType) []cloudprovider.InstanceType { + var genericInstanceTypes []cloudprovider.InstanceType + for _, it := range instanceTypes { + itRes := it.Resources() + if resources.IsZero(itRes[v1alpha1.ResourceAWSNeuron]) && + resources.IsZero(itRes[v1alpha1.ResourceAMDGPU]) && + resources.IsZero(itRes[v1alpha1.ResourceNVIDIAGPU]) { + genericInstanceTypes = append(genericInstanceTypes, it) + } + } + // if we got some subset of non-GPU types, then prefer to use those + if len(genericInstanceTypes) != 0 { + return genericInstanceTypes + } + return instanceTypes +} + func getInstanceID(node *v1.Node) (*string, error) { id := strings.Split(node.Spec.ProviderID, "/") if len(id) < 5 { @@ -365,19 +363,3 @@ func getCapacityType(instance *ec2.Instance) string { } return v1alpha1.CapacityTypeOnDemand } - -func combineFleetInstances(createFleetOutput ec2.CreateFleetOutput) []*string { - instanceIds := []*string{} - for _, reservation := range createFleetOutput.Instances { - instanceIds = append(instanceIds, reservation.InstanceIds...) - } - return instanceIds -} - -func combineReservations(reservations []*ec2.Reservation) []*ec2.Instance { - instances := []*ec2.Instance{} - for _, reservation := range reservations { - instances = append(instances, reservation.Instances...) - } - return instances -} diff --git a/pkg/cloudprovider/aws/instancetypes.go b/pkg/cloudprovider/aws/instancetypes.go index e400a0100c56..f06d9881ba06 100644 --- a/pkg/cloudprovider/aws/instancetypes.go +++ b/pkg/cloudprovider/aws/instancetypes.go @@ -17,6 +17,7 @@ package aws import ( "context" "fmt" + "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -42,6 +43,7 @@ const ( ) type InstanceTypeProvider struct { + sync.Mutex ec2api ec2iface.EC2API subnetProvider *SubnetProvider // Has two entries: one for all the instance types and one for all zones; values cached *before* considering insufficient capacity errors @@ -62,6 +64,8 @@ func NewInstanceTypeProvider(ec2api ec2iface.EC2API, subnetProvider *SubnetProvi // Get all instance type options (the constraints are only used for tag filtering on subnets, not for Requirements filtering) func (p *InstanceTypeProvider) Get(ctx context.Context, provider *v1alpha1.AWS) ([]cloudprovider.InstanceType, error) { + p.Lock() + defer p.Unlock() // Get InstanceTypes from EC2 instanceTypes, err := p.getInstanceTypes(ctx) if err != nil { diff --git a/pkg/cloudprovider/aws/launchtemplate.go b/pkg/cloudprovider/aws/launchtemplate.go index e0c979c532d8..900120719abc 100644 --- a/pkg/cloudprovider/aws/launchtemplate.go +++ b/pkg/cloudprovider/aws/launchtemplate.go @@ -80,6 +80,8 @@ func launchTemplateName(options *amifamily.LaunchTemplate) string { } func (p *LaunchTemplateProvider) Get(ctx context.Context, constraints *v1alpha1.Constraints, instanceTypes []cloudprovider.InstanceType, additionalLabels map[string]string) (map[string][]cloudprovider.InstanceType, error) { + p.Lock() + defer p.Unlock() // If Launch Template is directly specified then just use it if constraints.LaunchTemplateName != nil { return map[string][]cloudprovider.InstanceType{ptr.StringValue(constraints.LaunchTemplateName): instanceTypes}, nil @@ -124,10 +126,6 @@ func (p *LaunchTemplateProvider) Get(ctx context.Context, constraints *v1alpha1. } func (p *LaunchTemplateProvider) ensureLaunchTemplate(ctx context.Context, options *amifamily.LaunchTemplate) (*ec2.LaunchTemplate, error) { - // Ensure that multiple threads don't attempt to create the same launch template - p.Lock() - defer p.Unlock() - var launchTemplate *ec2.LaunchTemplate name := launchTemplateName(options) // Read from cache diff --git a/pkg/cloudprovider/aws/securitygroups.go b/pkg/cloudprovider/aws/securitygroups.go index b5565ab6ed5c..8e8c6f9d119b 100644 --- a/pkg/cloudprovider/aws/securitygroups.go +++ b/pkg/cloudprovider/aws/securitygroups.go @@ -17,6 +17,7 @@ package aws import ( "context" "fmt" + "sync" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" @@ -29,6 +30,7 @@ import ( ) type SecurityGroupProvider struct { + sync.Mutex ec2api ec2iface.EC2API cache *cache.Cache } @@ -41,6 +43,8 @@ func NewSecurityGroupProvider(ec2api ec2iface.EC2API) *SecurityGroupProvider { } func (p *SecurityGroupProvider) Get(ctx context.Context, constraints *v1alpha1.Constraints) ([]string, error) { + p.Lock() + defer p.Unlock() // Get SecurityGroups securityGroups, err := p.getSecurityGroups(ctx, p.getFilters(constraints)) if err != nil { diff --git a/pkg/cloudprovider/aws/subnets.go b/pkg/cloudprovider/aws/subnets.go index bc65d804b0ae..15a3d45e37b2 100644 --- a/pkg/cloudprovider/aws/subnets.go +++ b/pkg/cloudprovider/aws/subnets.go @@ -17,6 +17,7 @@ package aws import ( "context" "fmt" + "sync" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" @@ -30,6 +31,7 @@ import ( ) type SubnetProvider struct { + sync.Mutex ec2api ec2iface.EC2API cache *cache.Cache } @@ -42,6 +44,8 @@ func NewSubnetProvider(ec2api ec2iface.EC2API) *SubnetProvider { } func (p *SubnetProvider) Get(ctx context.Context, constraints *v1alpha1.AWS) ([]*ec2.Subnet, error) { + p.Lock() + defer p.Unlock() filters := getFilters(constraints) hash, err := hashstructure.Hash(filters, hashstructure.FormatV2, nil) if err != nil { diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index 38c99fe859b2..5930b2f2aed5 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -23,7 +23,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "github.com/Pallinder/go-randomdata" - "go.uber.org/multierr" "knative.dev/pkg/apis" "github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" @@ -41,7 +40,7 @@ type CloudProvider struct { // CreateCalls contains the arguments for every create call that was made since it was cleared mu sync.Mutex - CreateCalls []CreateCallArgs + CreateCalls []*cloudprovider.NodeRequest } type CreateCallArgs struct { @@ -50,50 +49,44 @@ type CreateCallArgs struct { Quantity int } -func (c *CloudProvider) Create(_ context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, bind func(*v1.Node) error) error { +func (c *CloudProvider) Create(ctx context.Context, nodeRequest *cloudprovider.NodeRequest) (*v1.Node, error) { c.mu.Lock() - c.CreateCalls = append(c.CreateCalls, CreateCallArgs{constraints, instanceTypes, quantity}) + c.CreateCalls = append(c.CreateCalls, nodeRequest) c.mu.Unlock() - - var err error - for i := 0; i < quantity; i++ { - name := strings.ToLower(randomdata.SillyName()) - instance := instanceTypes[0] - var zone, capacityType string - for _, o := range instance.Offerings() { - if constraints.Requirements.CapacityTypes().Has(o.CapacityType) && constraints.Requirements.Zones().Has(o.Zone) { - zone = o.Zone - capacityType = o.CapacityType - break - } + name := strings.ToLower(randomdata.SillyName()) + instance := nodeRequest.InstanceTypeOptions[0] + var zone, capacityType string + for _, o := range instance.Offerings() { + if nodeRequest.Constraints.Requirements.CapacityTypes().Has(o.CapacityType) && nodeRequest.Constraints.Requirements.Zones().Has(o.Zone) { + zone = o.Zone + capacityType = o.CapacityType + break } - - err = multierr.Append(err, bind(&v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: map[string]string{ - v1.LabelTopologyZone: zone, - v1.LabelInstanceTypeStable: instance.Name(), - v1alpha5.LabelCapacityType: capacityType, - }, + } + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + v1.LabelTopologyZone: zone, + v1.LabelInstanceTypeStable: instance.Name(), + v1alpha5.LabelCapacityType: capacityType, }, - Spec: v1.NodeSpec{ - ProviderID: fmt.Sprintf("fake:///%s/%s", name, zone), + }, + Spec: v1.NodeSpec{ + ProviderID: fmt.Sprintf("fake:///%s/%s", name, zone), + }, + Status: v1.NodeStatus{ + NodeInfo: v1.NodeSystemInfo{ + Architecture: instance.Architecture(), + OperatingSystem: v1alpha5.OperatingSystemLinux, }, - Status: v1.NodeStatus{ - NodeInfo: v1.NodeSystemInfo{ - Architecture: instance.Architecture(), - OperatingSystem: v1alpha5.OperatingSystemLinux, - }, - Allocatable: v1.ResourceList{ - v1.ResourcePods: instance.Resources()[v1.ResourcePods], - v1.ResourceCPU: instance.Resources()[v1.ResourceCPU], - v1.ResourceMemory: instance.Resources()[v1.ResourceMemory], - }, + Allocatable: v1.ResourceList{ + v1.ResourcePods: instance.Resources()[v1.ResourcePods], + v1.ResourceCPU: instance.Resources()[v1.ResourceCPU], + v1.ResourceMemory: instance.Resources()[v1.ResourceMemory], }, - })) - } - return err + }, + }, nil } func (c *CloudProvider) GetInstanceTypes(_ context.Context, _ *v1alpha5.Provider) ([]cloudprovider.InstanceType, error) { diff --git a/pkg/cloudprovider/metrics/cloudprovider.go b/pkg/cloudprovider/metrics/cloudprovider.go index 71b25c07998f..be733b57032b 100644 --- a/pkg/cloudprovider/metrics/cloudprovider.go +++ b/pkg/cloudprovider/metrics/cloudprovider.go @@ -67,9 +67,9 @@ func Decorate(cloudProvider cloudprovider.CloudProvider) cloudprovider.CloudProv return &decorator{cloudProvider} } -func (d *decorator) Create(ctx context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, callback func(*v1.Node) error) error { +func (d *decorator) Create(ctx context.Context, nodeRequest *cloudprovider.NodeRequest) (*v1.Node, error) { defer metrics.Measure(methodDurationHistogramVec.WithLabelValues(injection.GetControllerName(ctx), "Create", d.Name()))() - return d.CloudProvider.Create(ctx, constraints, instanceTypes, quantity, callback) + return d.CloudProvider.Create(ctx, nodeRequest) } func (d *decorator) Delete(ctx context.Context, node *v1.Node) error { diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index 7bf7e0f0a7ea..f8fbabdce121 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -25,13 +25,18 @@ import ( "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" ) +// Options are injected into cloud providers' factories +type Options struct { + ClientSet *kubernetes.Clientset +} + // CloudProvider interface is implemented by cloud providers to support provisioning. type CloudProvider interface { - // Create a set of nodes for each of the given constraints. This API uses a + // Create a node given constraints and instance type options. This API uses a // callback pattern to enable cloudproviders to batch capacity creation // requests. The callback must be called with a theoretical node object that // is fulfilled by the cloud providers capacity creation request. - Create(context.Context, *v1alpha5.Constraints, []InstanceType, int, func(*v1.Node) error) error + Create(context.Context, *NodeRequest) (*v1.Node, error) // Delete node in cloudprovider Delete(context.Context, *v1.Node) error // GetInstanceTypes returns instance types supported by the cloudprovider. @@ -45,9 +50,9 @@ type CloudProvider interface { Name() string } -// Options are injected into cloud providers' factories -type Options struct { - ClientSet *kubernetes.Clientset +type NodeRequest struct { + Constraints *v1alpha5.Constraints + InstanceTypeOptions []InstanceType } // InstanceType describes the properties of a potential node (either concrete attributes of an instance of this type diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index f894a038b72b..a5bc9ce9e038 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -21,6 +21,7 @@ import ( "github.com/imdario/mergo" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/multierr" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -150,22 +151,21 @@ func (p *Provisioner) launch(ctx context.Context, constraints *v1alpha5.Constrai if err := p.Spec.Limits.ExceededBy(latest.Status.Resources); err != nil { return err } - // Create and Bind - pods := make(chan []*v1.Pod, len(packing.Pods)) - defer close(pods) - for _, ps := range packing.Pods { - pods <- ps - } - return p.cloudProvider.Create(ctx, constraints, packing.InstanceTypeOptions, packing.NodeQuantity, func(node *v1.Node) error { - if err := mergo.Merge(node, constraints.ToNode()); err != nil { - return fmt.Errorf("merging cloud provider node, %w", err) - } - return p.bind(ctx, node, <-pods) + errs := make([]error, packing.NodeQuantity) + workqueue.ParallelizeUntil(ctx, packing.NodeQuantity, packing.NodeQuantity, func(i int) { + errs[i] = p.create(ctx, &cloudprovider.NodeRequest{Constraints: constraints, InstanceTypeOptions: packing.InstanceTypeOptions}, packing.Pods[i]) }) + return multierr.Combine(errs...) } -func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err error) { - defer metrics.Measure(bindTimeHistogram.WithLabelValues(injection.GetNamespacedName(ctx).Name))() +func (p *Provisioner) create(ctx context.Context, nodeRequest *cloudprovider.NodeRequest, pods []*v1.Pod) error { + node, err := p.cloudProvider.Create(ctx, nodeRequest) + if err != nil { + return fmt.Errorf("creating cloud provider machine, %w", err) + } + if err := mergo.Merge(node, nodeRequest.Constraints.ToNode()); err != nil { + return fmt.Errorf("merging cloud provider node, %w", err) + } // Idempotently create a node. In rare cases, nodes can come online and // self register before the controller is able to register a node object // with the API server. In the common case, we create the node object @@ -176,6 +176,14 @@ func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) ( return fmt.Errorf("creating node %s, %w", node.Name, err) } } + if err := p.bind(ctx, node, pods); err != nil { + return fmt.Errorf("binding pods, %w", err) + } + return nil +} + +func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err error) { + defer metrics.Measure(bindTimeHistogram.WithLabelValues(injection.GetNamespacedName(ctx).Name))() // Bind pods var bound int64 workqueue.ParallelizeUntil(ctx, len(pods), len(pods), func(i int) { diff --git a/pkg/controllers/provisioning/scheduling/instance_selection_test.go b/pkg/controllers/provisioning/scheduling/instance_selection_test.go index 8fb51658cbfd..fa5a0e62c4be 100644 --- a/pkg/controllers/provisioning/scheduling/instance_selection_test.go +++ b/pkg/controllers/provisioning/scheduling/instance_selection_test.go @@ -16,6 +16,9 @@ package scheduling_test import ( "fmt" + "math" + "math/rand" + "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" "github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" @@ -28,8 +31,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/sets" - "math" - "math/rand" ) var _ = Describe("Instance Type Selection", func() { @@ -82,7 +83,7 @@ var _ = Describe("Instance Type Selection", func() { node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) // ensure that the entire list of instance types match the label - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1.LabelArchStable, v1alpha5.ArchitectureAmd64) + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1.LabelArchStable, v1alpha5.ArchitectureAmd64) }) It("should schedule on one of the cheapest instances (pod arch = arm64)", func() { pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod( @@ -93,7 +94,7 @@ var _ = Describe("Instance Type Selection", func() { }}})) node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1.LabelArchStable, v1alpha5.ArchitectureArm64) + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1.LabelArchStable, v1alpha5.ArchitectureArm64) }) It("should schedule on one of the cheapest instances (prov arch = amd64)", func() { provisioner.Spec.Requirements.Requirements = []v1.NodeSelectorRequirement{ @@ -106,7 +107,7 @@ var _ = Describe("Instance Type Selection", func() { pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod()) node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1.LabelArchStable, v1alpha5.ArchitectureAmd64) + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1.LabelArchStable, v1alpha5.ArchitectureAmd64) }) It("should schedule on one of the cheapest instances (prov arch = arm64)", func() { provisioner.Spec.Requirements.Requirements = []v1.NodeSelectorRequirement{ @@ -119,7 +120,7 @@ var _ = Describe("Instance Type Selection", func() { pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod()) node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1.LabelArchStable, v1alpha5.ArchitectureArm64) + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1.LabelArchStable, v1alpha5.ArchitectureArm64) }) It("should schedule on one of the cheapest instances (prov os = windows)", func() { provisioner.Spec.Requirements.Requirements = []v1.NodeSelectorRequirement{ @@ -132,7 +133,7 @@ var _ = Describe("Instance Type Selection", func() { pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod()) node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1.LabelOSStable, "windows") + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1.LabelOSStable, "windows") }) It("should schedule on one of the cheapest instances (pod os = windows)", func() { pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod( @@ -143,7 +144,7 @@ var _ = Describe("Instance Type Selection", func() { }}})) node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1.LabelOSStable, "windows") + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1.LabelOSStable, "windows") }) It("should schedule on one of the cheapest instances (prov os = windows)", func() { provisioner.Spec.Requirements.Requirements = []v1.NodeSelectorRequirement{ @@ -156,7 +157,7 @@ var _ = Describe("Instance Type Selection", func() { pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod()) node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1.LabelOSStable, "windows") + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1.LabelOSStable, "windows") }) It("should schedule on one of the cheapest instances (pod os = linux)", func() { pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod( @@ -167,7 +168,7 @@ var _ = Describe("Instance Type Selection", func() { }}})) node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1.LabelOSStable, "linux") + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1.LabelOSStable, "linux") }) It("should schedule on one of the cheapest instances (pod os = linux)", func() { pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod( @@ -178,7 +179,7 @@ var _ = Describe("Instance Type Selection", func() { }}})) node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1.LabelOSStable, "linux") + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1.LabelOSStable, "linux") }) It("should schedule on one of the cheapest instances (prov zone = test-zone-2)", func() { provisioner.Spec.Requirements.Requirements = []v1.NodeSelectorRequirement{ @@ -191,7 +192,7 @@ var _ = Describe("Instance Type Selection", func() { pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod()) node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1.LabelTopologyZone, "test-zone-2") + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1.LabelTopologyZone, "test-zone-2") }) It("should schedule on one of the cheapest instances (pod zone = test-zone-2)", func() { pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod( @@ -202,7 +203,7 @@ var _ = Describe("Instance Type Selection", func() { }}})) node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1.LabelTopologyZone, "test-zone-2") + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1.LabelTopologyZone, "test-zone-2") }) It("should schedule on one of the cheapest instances (prov ct = spot)", func() { provisioner.Spec.Requirements.Requirements = []v1.NodeSelectorRequirement{ @@ -215,7 +216,7 @@ var _ = Describe("Instance Type Selection", func() { pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod()) node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1alpha5.LabelCapacityType, v1alpha1.CapacityTypeSpot) + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1alpha5.LabelCapacityType, v1alpha1.CapacityTypeSpot) }) It("should schedule on one of the cheapest instances (pod ct = spot)", func() { pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod( @@ -226,7 +227,7 @@ var _ = Describe("Instance Type Selection", func() { }}})) node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1alpha5.LabelCapacityType, v1alpha1.CapacityTypeSpot) + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1alpha5.LabelCapacityType, v1alpha1.CapacityTypeSpot) }) It("should schedule on one of the cheapest instances (prov ct = ondemand, prov zone = test-zone-1)", func() { provisioner.Spec.Requirements.Requirements = []v1.NodeSelectorRequirement{ @@ -244,7 +245,7 @@ var _ = Describe("Instance Type Selection", func() { pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod()) node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) - ExpectInstancesWithOffering(cloudProv.CreateCalls[0].InstanceTypes, v1alpha1.CapacityTypeOnDemand, "test-zone-1") + ExpectInstancesWithOffering(cloudProv.CreateCalls[0].InstanceTypeOptions, v1alpha1.CapacityTypeOnDemand, "test-zone-1") }) It("should schedule on one of the cheapest instances (pod ct = spot, pod zone = test-zone-1)", func() { pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod( @@ -261,7 +262,7 @@ var _ = Describe("Instance Type Selection", func() { }})) node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) - ExpectInstancesWithOffering(cloudProv.CreateCalls[0].InstanceTypes, v1alpha1.CapacityTypeSpot, "test-zone-1") + ExpectInstancesWithOffering(cloudProv.CreateCalls[0].InstanceTypeOptions, v1alpha1.CapacityTypeSpot, "test-zone-1") }) It("should schedule on one of the cheapest instances (prov ct = spot, pod zone = test-zone-2)", func() { provisioner.Spec.Requirements.Requirements = []v1.NodeSelectorRequirement{ @@ -279,7 +280,7 @@ var _ = Describe("Instance Type Selection", func() { }}})) node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) - ExpectInstancesWithOffering(cloudProv.CreateCalls[0].InstanceTypes, v1alpha1.CapacityTypeSpot, "test-zone-2") + ExpectInstancesWithOffering(cloudProv.CreateCalls[0].InstanceTypeOptions, v1alpha1.CapacityTypeSpot, "test-zone-2") }) It("should schedule on one of the cheapest instances (prov ct = ondemand/test-zone-1/arm64/windows)", func() { provisioner.Spec.Requirements.Requirements = []v1.NodeSelectorRequirement{ @@ -307,9 +308,9 @@ var _ = Describe("Instance Type Selection", func() { pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod()) node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) - ExpectInstancesWithOffering(cloudProv.CreateCalls[0].InstanceTypes, v1alpha1.CapacityTypeOnDemand, "test-zone-1") - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1.LabelOSStable, "windows") - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1.LabelArchStable, "arm64") + ExpectInstancesWithOffering(cloudProv.CreateCalls[0].InstanceTypeOptions, v1alpha1.CapacityTypeOnDemand, "test-zone-1") + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1.LabelOSStable, "windows") + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1.LabelArchStable, "arm64") }) It("should schedule on one of the cheapest instances (prov = spot/test-zone-2, pod = amd64/linux)", func() { provisioner.Spec.Requirements.Requirements = []v1.NodeSelectorRequirement{ @@ -339,9 +340,9 @@ var _ = Describe("Instance Type Selection", func() { }})) node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) - ExpectInstancesWithOffering(cloudProv.CreateCalls[0].InstanceTypes, v1alpha1.CapacityTypeSpot, "test-zone-2") - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1.LabelOSStable, "linux") - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1.LabelArchStable, "amd64") + ExpectInstancesWithOffering(cloudProv.CreateCalls[0].InstanceTypeOptions, v1alpha1.CapacityTypeSpot, "test-zone-2") + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1.LabelOSStable, "linux") + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1.LabelArchStable, "amd64") }) It("should schedule on one of the cheapest instances (pod ct = spot/test-zone-2/amd64/linux)", func() { pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod( @@ -369,9 +370,9 @@ var _ = Describe("Instance Type Selection", func() { }})) node := ExpectScheduled(ctx, env.Client, pod[0]) Expect(nodePrice(node)).To(Equal(minPrice)) - ExpectInstancesWithOffering(cloudProv.CreateCalls[0].InstanceTypes, v1alpha1.CapacityTypeSpot, "test-zone-2") - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1.LabelOSStable, "linux") - ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypes, v1.LabelArchStable, "amd64") + ExpectInstancesWithOffering(cloudProv.CreateCalls[0].InstanceTypeOptions, v1alpha1.CapacityTypeSpot, "test-zone-2") + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1.LabelOSStable, "linux") + ExpectInstancesWithLabel(cloudProv.CreateCalls[0].InstanceTypeOptions, v1.LabelArchStable, "amd64") }) It("should not schedule if no instance type matches selector (pod arch = arm)", func() { // remove all Arm instance types @@ -468,7 +469,7 @@ var _ = Describe("Instance Type Selection", func() { // should fit on one node Expect(nodeNames).To(HaveLen(1)) totalPodResources := resources.RequestsForPods(pods...) - for _, it := range cloudProv.CreateCalls[0].InstanceTypes { + for _, it := range cloudProv.CreateCalls[0].InstanceTypeOptions { totalReserved := resources.Merge(totalPodResources, it.Overhead()) // the total pod resources in CPU and memory + instance overhead should always be less than the // resources available on every viable instance has diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index 7242e21ffcf8..d97997e2eee7 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -1939,27 +1939,6 @@ var _ = Describe("Binpacking", func() { }}))[0] ExpectNotScheduled(ctx, env.Client, pod) }) - It("uses the create quantity argument for identical node creation", func() { - opts := test.PodOptions{ - NodeSelector: map[string]string{v1.LabelArchStable: "amd64"}, - Conditions: []v1.PodCondition{{Type: v1.PodScheduled, Reason: v1.PodReasonUnschedulable, Status: v1.ConditionFalse}}, - ResourceRequirements: v1.ResourceRequirements{ - Requests: map[v1.ResourceName]resource.Quantity{ - v1.ResourceMemory: resource.MustParse("1.8G"), - }, - }} - pods := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.Pods(40, opts)...) - nodeNames := sets.NewString() - for _, p := range pods { - node := ExpectScheduled(ctx, env.Client, p) - nodeNames.Insert(node.Name) - Expect(node.Labels[v1.LabelInstanceTypeStable]).To(Equal("default-instance-type")) - } - Expect(nodeNames).To(HaveLen(20)) - // should get one call with a quantity of 20 - Eventually(cloudProv.CreateCalls).Should(HaveLen(1)) - Expect(cloudProv.CreateCalls[0].Quantity).To(Equal(20)) - }) It("should create new nodes when a node is at capacity due to pod limits per node", func() { opts := test.PodOptions{ NodeSelector: map[string]string{v1.LabelArchStable: "amd64"}, @@ -2022,7 +2001,7 @@ var _ = Describe("Binpacking", func() { Expect(node.Labels[v1.LabelInstanceTypeStable]).To(Equal("large")) // all three options should be passed to the cloud provider possibleInstanceType := sets.NewString() - for _, it := range cloudProv.CreateCalls[0].InstanceTypes { + for _, it := range cloudProv.CreateCalls[0].InstanceTypeOptions { possibleInstanceType.Insert(it.Name()) } Expect(possibleInstanceType).To(Equal(sets.NewString("small", "medium", "large")))