From 4b4ce9408121a82f339f0398b2af62a4acb77685 Mon Sep 17 00:00:00 2001 From: Jerad C Date: Mon, 20 Dec 2021 09:56:17 -0600 Subject: [PATCH 1/3] fix CloudProvider metric --- pkg/cloudprovider/metrics/cloudprovider.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/cloudprovider/metrics/cloudprovider.go b/pkg/cloudprovider/metrics/cloudprovider.go index 8ffac52399bb..ed8f7be5cc27 100644 --- a/pkg/cloudprovider/metrics/cloudprovider.go +++ b/pkg/cloudprovider/metrics/cloudprovider.go @@ -67,9 +67,10 @@ func Decorate(cloudProvider cloudprovider.CloudProvider) cloudprovider.CloudProv } func (d *decorator) Create(ctx context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, callback func(*v1.Node) error) <-chan error { + recordLatency := metrics.Measure(methodDurationHistogramVec.WithLabelValues(getControllerName(ctx), "Create", d.Name())) out := make(chan error) go func(in <-chan error) { - defer metrics.Measure(methodDurationHistogramVec.WithLabelValues(getControllerName(ctx), "Create", d.Name()))() + defer recordLatency() select { case err := <-in: out <- err From 75492a4c58414ceeb925cdbb7c09a6104fb85091 Mon Sep 17 00:00:00 2001 From: Jerad C Date: Mon, 20 Dec 2021 13:16:35 -0600 Subject: [PATCH 2/3] refactor `CloudProvider.Create()` to return an `error` --- pkg/cloudprovider/aws/cloudprovider.go | 8 +--- pkg/cloudprovider/fake/cloudprovider.go | 49 ++++++++++----------- pkg/cloudprovider/metrics/cloudprovider.go | 16 ++----- pkg/cloudprovider/types.go | 5 +-- pkg/controllers/provisioning/provisioner.go | 2 +- 5 files changed, 31 insertions(+), 49 deletions(-) diff --git a/pkg/cloudprovider/aws/cloudprovider.go b/pkg/cloudprovider/aws/cloudprovider.go index a48f80cc3436..3f4c3885f823 100644 --- a/pkg/cloudprovider/aws/cloudprovider.go +++ b/pkg/cloudprovider/aws/cloudprovider.go @@ -108,13 +108,7 @@ func withUserAgent(sess *session.Session) *session.Session { } // Create a node given the constraints. -func (c *CloudProvider) Create(ctx context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, callback func(*v1.Node) error) <-chan error { - return c.creationQueue.Add(func() error { - return c.create(ctx, constraints, instanceTypes, quantity, callback) - }) -} - -func (c *CloudProvider) create(ctx context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, callback func(*v1.Node) error) error { +func (c *CloudProvider) Create(ctx context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, callback func(*v1.Node) error) error { vendorConstraints, err := v1alpha1.Deserialize(constraints) if err != nil { return err diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index fcdec5c5912a..125c01cfb1ff 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -22,6 +22,7 @@ import ( "github.com/Pallinder/go-randomdata" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" + "go.uber.org/multierr" "knative.dev/pkg/apis" v1 "k8s.io/api/core/v1" @@ -33,8 +34,8 @@ type CloudProvider struct { InstanceTypes []cloudprovider.InstanceType } -func (c *CloudProvider) Create(_ context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, bind func(*v1.Node) error) <-chan error { - err := make(chan error) +func (c *CloudProvider) Create(_ context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, bind func(*v1.Node) error) error { + var err error for i := 0; i < quantity; i++ { name := strings.ToLower(randomdata.SillyName()) instance := instanceTypes[0] @@ -49,32 +50,30 @@ func (c *CloudProvider) Create(_ context.Context, constraints *v1alpha5.Constrai } } - go func() { - err <- bind(&v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: map[string]string{ - v1.LabelTopologyZone: zone, - v1.LabelInstanceTypeStable: instance.Name(), - v1alpha5.LabelCapacityType: capacityType, - }, + err = multierr.Append(err, bind(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + v1.LabelTopologyZone: zone, + v1.LabelInstanceTypeStable: instance.Name(), + v1alpha5.LabelCapacityType: capacityType, }, - Spec: v1.NodeSpec{ - ProviderID: fmt.Sprintf("fake:///%s/%s", name, zone), + }, + Spec: v1.NodeSpec{ + ProviderID: fmt.Sprintf("fake:///%s/%s", name, zone), + }, + Status: v1.NodeStatus{ + NodeInfo: v1.NodeSystemInfo{ + Architecture: instance.Architecture(), + OperatingSystem: v1alpha5.OperatingSystemLinux, }, - Status: v1.NodeStatus{ - NodeInfo: v1.NodeSystemInfo{ - Architecture: instance.Architecture(), - OperatingSystem: v1alpha5.OperatingSystemLinux, - }, - Allocatable: v1.ResourceList{ - v1.ResourcePods: *instance.Pods(), - v1.ResourceCPU: *instance.CPU(), - v1.ResourceMemory: *instance.Memory(), - }, + Allocatable: v1.ResourceList{ + v1.ResourcePods: *instance.Pods(), + v1.ResourceCPU: *instance.CPU(), + v1.ResourceMemory: *instance.Memory(), }, - }) - }() + }, + })) } return err } diff --git a/pkg/cloudprovider/metrics/cloudprovider.go b/pkg/cloudprovider/metrics/cloudprovider.go index ed8f7be5cc27..620bc0c5fb3e 100644 --- a/pkg/cloudprovider/metrics/cloudprovider.go +++ b/pkg/cloudprovider/metrics/cloudprovider.go @@ -66,19 +66,9 @@ func Decorate(cloudProvider cloudprovider.CloudProvider) cloudprovider.CloudProv return &decorator{cloudProvider} } -func (d *decorator) Create(ctx context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, callback func(*v1.Node) error) <-chan error { - recordLatency := metrics.Measure(methodDurationHistogramVec.WithLabelValues(getControllerName(ctx), "Create", d.Name())) - out := make(chan error) - go func(in <-chan error) { - defer recordLatency() - select { - case err := <-in: - out <- err - case <-ctx.Done(): - } - close(out) - }(d.CloudProvider.Create(ctx, constraints, instanceTypes, quantity, callback)) - return out +func (d *decorator) Create(ctx context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, callback func(*v1.Node) error) error { + defer metrics.Measure(methodDurationHistogramVec.WithLabelValues(getControllerName(ctx), "Create", d.Name()))() + return d.CloudProvider.Create(ctx, constraints, instanceTypes, quantity, callback) } func (d *decorator) Delete(ctx context.Context, node *v1.Node) error { diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index 332beee40e00..88620e921a88 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -30,9 +30,8 @@ type CloudProvider interface { // Create a set of nodes for each of the given constraints. This API uses a // callback pattern to enable cloudproviders to batch capacity creation // requests. The callback must be called with a theoretical node object that - // is fulfilled by the cloud providers capacity creation request. This API - // is called in parallel and then waits for all channels to return nil or error. - Create(context.Context, *v1alpha5.Constraints, []InstanceType, int, func(*v1.Node) error) <-chan error + // is fulfilled by the cloud providers capacity creation request. + Create(context.Context, *v1alpha5.Constraints, []InstanceType, int, func(*v1.Node) error) error // Delete node in cloudprovider Delete(context.Context, *v1.Node) error // GetInstanceTypes returns instance types supported by the cloudprovider. diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 27f724fec3a9..6b38d1470336 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -199,7 +199,7 @@ func (p *Provisioner) launch(ctx context.Context, constraints *v1alpha5.Constrai for _, ps := range packing.Pods { pods <- ps } - return <-p.cloudProvider.Create(ctx, constraints, packing.InstanceTypeOptions, packing.NodeQuantity, func(node *v1.Node) error { + return p.cloudProvider.Create(ctx, constraints, packing.InstanceTypeOptions, packing.NodeQuantity, func(node *v1.Node) error { node.Labels = functional.UnionStringMaps(node.Labels, constraints.Labels) node.Spec.Taints = append(node.Spec.Taints, constraints.Taints...) return p.bind(ctx, node, <-pods) From d62d1ee4d0314db34639dfceb793efac3fe387ba Mon Sep 17 00:00:00 2001 From: Jerad C Date: Mon, 20 Dec 2021 15:20:40 -0600 Subject: [PATCH 3/3] remove unused work queue --- pkg/cloudprovider/aws/cloudprovider.go | 3 --- pkg/cloudprovider/aws/suite_test.go | 2 -- 2 files changed, 5 deletions(-) diff --git a/pkg/cloudprovider/aws/cloudprovider.go b/pkg/cloudprovider/aws/cloudprovider.go index 3f4c3885f823..97090ca51f1c 100644 --- a/pkg/cloudprovider/aws/cloudprovider.go +++ b/pkg/cloudprovider/aws/cloudprovider.go @@ -28,7 +28,6 @@ import ( "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" "github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" - "github.com/aws/karpenter/pkg/utils/parallel" "github.com/aws/karpenter/pkg/utils/project" "go.uber.org/multierr" @@ -59,7 +58,6 @@ type CloudProvider struct { instanceTypeProvider *InstanceTypeProvider subnetProvider *SubnetProvider instanceProvider *InstanceProvider - creationQueue *parallel.WorkQueue } func NewCloudProvider(ctx context.Context, options cloudprovider.Options) *CloudProvider { @@ -87,7 +85,6 @@ func NewCloudProvider(ctx context.Context, options cloudprovider.Options) *Cloud NewSecurityGroupProvider(ec2api), ), }, - creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst), } } diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index eaf36b3e9409..d3a2d6fafd84 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -32,7 +32,6 @@ import ( . "github.com/aws/karpenter/pkg/test/expectations" "github.com/aws/karpenter/pkg/utils/injection" "github.com/aws/karpenter/pkg/utils/options" - "github.com/aws/karpenter/pkg/utils/parallel" "github.com/aws/karpenter/pkg/utils/resources" "github.com/patrickmn/go-cache" @@ -94,7 +93,6 @@ var _ = BeforeSuite(func() { cache: launchTemplateCache, }, }, - creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst), } registry.RegisterOrDie(ctx, cloudProvider) provisioners = provisioning.NewController(ctx, e.Client, clientSet.CoreV1(), cloudProvider)