Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix CloudProvider metric #1031

Merged
merged 3 commits into from
Dec 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions pkg/cloudprovider/aws/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider"
"github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1"
"github.com/aws/karpenter/pkg/utils/parallel"
"github.com/aws/karpenter/pkg/utils/project"

"go.uber.org/multierr"
Expand Down Expand Up @@ -59,7 +58,6 @@ type CloudProvider struct {
instanceTypeProvider *InstanceTypeProvider
subnetProvider *SubnetProvider
instanceProvider *InstanceProvider
creationQueue *parallel.WorkQueue
}

func NewCloudProvider(ctx context.Context, options cloudprovider.Options) *CloudProvider {
Expand Down Expand Up @@ -87,7 +85,6 @@ func NewCloudProvider(ctx context.Context, options cloudprovider.Options) *Cloud
NewSecurityGroupProvider(ec2api),
),
},
creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst),
}
}

Expand All @@ -108,13 +105,7 @@ func withUserAgent(sess *session.Session) *session.Session {
}

// Create a node given the constraints.
func (c *CloudProvider) Create(ctx context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, callback func(*v1.Node) error) <-chan error {
return c.creationQueue.Add(func() error {
return c.create(ctx, constraints, instanceTypes, quantity, callback)
})
}

func (c *CloudProvider) create(ctx context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, callback func(*v1.Node) error) error {
func (c *CloudProvider) Create(ctx context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, callback func(*v1.Node) error) error {
vendorConstraints, err := v1alpha1.Deserialize(constraints)
if err != nil {
return err
Expand Down
2 changes: 0 additions & 2 deletions pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
. "github.com/aws/karpenter/pkg/test/expectations"
"github.com/aws/karpenter/pkg/utils/injection"
"github.com/aws/karpenter/pkg/utils/options"
"github.com/aws/karpenter/pkg/utils/parallel"
"github.com/aws/karpenter/pkg/utils/resources"
"github.com/patrickmn/go-cache"

Expand Down Expand Up @@ -94,7 +93,6 @@ var _ = BeforeSuite(func() {
cache: launchTemplateCache,
},
},
creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst),
}
registry.RegisterOrDie(ctx, cloudProvider)
provisioners = provisioning.NewController(ctx, e.Client, clientSet.CoreV1(), cloudProvider)
Expand Down
49 changes: 24 additions & 25 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/Pallinder/go-randomdata"
"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider"
"go.uber.org/multierr"
"knative.dev/pkg/apis"

v1 "k8s.io/api/core/v1"
Expand All @@ -33,8 +34,8 @@ type CloudProvider struct {
InstanceTypes []cloudprovider.InstanceType
}

func (c *CloudProvider) Create(_ context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, bind func(*v1.Node) error) <-chan error {
err := make(chan error)
func (c *CloudProvider) Create(_ context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, bind func(*v1.Node) error) error {
var err error
for i := 0; i < quantity; i++ {
name := strings.ToLower(randomdata.SillyName())
instance := instanceTypes[0]
Expand All @@ -49,32 +50,30 @@ func (c *CloudProvider) Create(_ context.Context, constraints *v1alpha5.Constrai
}
}

go func() {
err <- bind(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{
v1.LabelTopologyZone: zone,
v1.LabelInstanceTypeStable: instance.Name(),
v1alpha5.LabelCapacityType: capacityType,
},
err = multierr.Append(err, bind(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{
v1.LabelTopologyZone: zone,
v1.LabelInstanceTypeStable: instance.Name(),
v1alpha5.LabelCapacityType: capacityType,
},
Spec: v1.NodeSpec{
ProviderID: fmt.Sprintf("fake:///%s/%s", name, zone),
},
Spec: v1.NodeSpec{
ProviderID: fmt.Sprintf("fake:///%s/%s", name, zone),
},
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
Architecture: instance.Architecture(),
OperatingSystem: v1alpha5.OperatingSystemLinux,
},
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
Architecture: instance.Architecture(),
OperatingSystem: v1alpha5.OperatingSystemLinux,
},
Allocatable: v1.ResourceList{
v1.ResourcePods: *instance.Pods(),
v1.ResourceCPU: *instance.CPU(),
v1.ResourceMemory: *instance.Memory(),
},
Allocatable: v1.ResourceList{
v1.ResourcePods: *instance.Pods(),
v1.ResourceCPU: *instance.CPU(),
v1.ResourceMemory: *instance.Memory(),
},
})
}()
},
}))
}
return err
}
Expand Down
15 changes: 3 additions & 12 deletions pkg/cloudprovider/metrics/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,9 @@ func Decorate(cloudProvider cloudprovider.CloudProvider) cloudprovider.CloudProv
return &decorator{cloudProvider}
}

func (d *decorator) Create(ctx context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, callback func(*v1.Node) error) <-chan error {
out := make(chan error)
go func(in <-chan error) {
defer metrics.Measure(methodDurationHistogramVec.WithLabelValues(getControllerName(ctx), "Create", d.Name()))()
select {
case err := <-in:
out <- err
case <-ctx.Done():
}
close(out)
}(d.CloudProvider.Create(ctx, constraints, instanceTypes, quantity, callback))
return out
func (d *decorator) Create(ctx context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, callback func(*v1.Node) error) error {
defer metrics.Measure(methodDurationHistogramVec.WithLabelValues(getControllerName(ctx), "Create", d.Name()))()
return d.CloudProvider.Create(ctx, constraints, instanceTypes, quantity, callback)
}

func (d *decorator) Delete(ctx context.Context, node *v1.Node) error {
Expand Down
5 changes: 2 additions & 3 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ type CloudProvider interface {
// Create a set of nodes for each of the given constraints. This API uses a
// callback pattern to enable cloudproviders to batch capacity creation
// requests. The callback must be called with a theoretical node object that
// is fulfilled by the cloud providers capacity creation request. This API
// is called in parallel and then waits for all channels to return nil or error.
Create(context.Context, *v1alpha5.Constraints, []InstanceType, int, func(*v1.Node) error) <-chan error
// is fulfilled by the cloud providers capacity creation request.
Create(context.Context, *v1alpha5.Constraints, []InstanceType, int, func(*v1.Node) error) error
// Delete node in cloudprovider
Delete(context.Context, *v1.Node) error
// GetInstanceTypes returns instance types supported by the cloudprovider.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (p *Provisioner) launch(ctx context.Context, constraints *v1alpha5.Constrai
for _, ps := range packing.Pods {
pods <- ps
}
return <-p.cloudProvider.Create(ctx, constraints, packing.InstanceTypeOptions, packing.NodeQuantity, func(node *v1.Node) error {
return p.cloudProvider.Create(ctx, constraints, packing.InstanceTypeOptions, packing.NodeQuantity, func(node *v1.Node) error {
node.Labels = functional.UnionStringMaps(node.Labels, constraints.Labels)
node.Spec.Taints = append(node.Spec.Taints, constraints.Taints...)
return p.bind(ctx, node, <-pods)
Expand Down