Skip to content

Commit

Permalink
Parallelize packing and launching (#1222)
Browse files Browse the repository at this point in the history
  • Loading branch information
ellistarn authored Jan 25, 2022
1 parent b95b51f commit da85afd
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 24 deletions.
5 changes: 4 additions & 1 deletion pkg/apis/provisioning/v1alpha5/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ type Constraints struct {
KubeletConfiguration KubeletConfiguration `json:"kubeletConfiguration,omitempty"`
// Provider contains fields specific to your cloudprovider.
// +kubebuilder:pruning:PreserveUnknownFields
Provider *runtime.RawExtension `json:"provider,omitempty"`
Provider *Provider `json:"provider,omitempty"`
}

// +kubebuilder:object:generate=false
type Provider = runtime.RawExtension

// ValidatePod returns an error if the pod's requirements are not met by the constraints
func (c *Constraints) ValidatePod(pod *v1.Pod) error {
// Tolerate Taints
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloudprovider/aws/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ func (c *CloudProvider) Create(ctx context.Context, constraints *v1alpha5.Constr
}

// GetInstanceTypes returns all available InstanceTypes despite accepting a Constraints struct (note that it does not utilize Requirements)
func (c *CloudProvider) GetInstanceTypes(ctx context.Context, constraints *v1alpha5.Constraints) ([]cloudprovider.InstanceType, error) {
vendorConstraints, err := v1alpha1.Deserialize(constraints)
func (c *CloudProvider) GetInstanceTypes(ctx context.Context, provider *v1alpha5.Provider) ([]cloudprovider.InstanceType, error) {
vendorConstraints, err := v1alpha1.Deserialize(&v1alpha5.Constraints{Provider: provider})
if err != nil {
return nil, apis.ErrGeneric(err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (c *CloudProvider) Create(_ context.Context, constraints *v1alpha5.Constrai
return err
}

func (c *CloudProvider) GetInstanceTypes(_ context.Context, _ *v1alpha5.Constraints) ([]cloudprovider.InstanceType, error) {
func (c *CloudProvider) GetInstanceTypes(_ context.Context, _ *v1alpha5.Provider) ([]cloudprovider.InstanceType, error) {
if c.InstanceTypes != nil {
return c.InstanceTypes, nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloudprovider/metrics/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func (d *decorator) Delete(ctx context.Context, node *v1.Node) error {
return d.CloudProvider.Delete(ctx, node)
}

func (d *decorator) GetInstanceTypes(ctx context.Context, constraints *v1alpha5.Constraints) ([]cloudprovider.InstanceType, error) {
func (d *decorator) GetInstanceTypes(ctx context.Context, provider *v1alpha5.Provider) ([]cloudprovider.InstanceType, error) {
defer metrics.Measure(methodDurationHistogramVec.WithLabelValues(getControllerName(ctx), "GetInstanceTypes", d.Name()))()
return d.CloudProvider.GetInstanceTypes(ctx, constraints)
return d.CloudProvider.GetInstanceTypes(ctx, provider)
}

func (d *decorator) Default(ctx context.Context, constraints *v1alpha5.Constraints) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type CloudProvider interface {
Delete(context.Context, *v1.Node) error
// GetInstanceTypes returns instance types supported by the cloudprovider.
// Availability of types or zone may vary by provisioner or over time.
GetInstanceTypes(context.Context, *v1alpha5.Constraints) ([]InstanceType, error)
GetInstanceTypes(context.Context, *v1alpha5.Provider) ([]InstanceType, error)
// Default is a hook for additional defaulting logic at webhook time.
Default(context.Context, *v1alpha5.Constraints)
// Validate is a hook for additional validation logic at webhook time.
Expand Down
8 changes: 1 addition & 7 deletions pkg/controllers/provisioning/binpacking/packer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,8 @@ type Packing struct {
// Pods provided are all schedulable in the same zone as tightly as possible.
// It follows the First Fit Decreasing bin packing technique, reference-
// https://en.wikipedia.org/wiki/Bin_packing_problem#First_Fit_Decreasing_(FFD)
func (p *Packer) Pack(ctx context.Context, constraints *v1alpha5.Constraints, pods []*v1.Pod) ([]*Packing, error) {
func (p *Packer) Pack(ctx context.Context, constraints *v1alpha5.Constraints, pods []*v1.Pod, instanceTypes []cloudprovider.InstanceType) ([]*Packing, error) {
defer metrics.Measure(packDuration.WithLabelValues(injection.GetNamespacedName(ctx).Name))()

// Get instance type options
instanceTypes, err := p.cloudProvider.GetInstanceTypes(ctx, constraints)
if err != nil {
return nil, fmt.Errorf("getting instance types, %w", err)
}
// Get daemons for overhead calculations
daemons, err := p.getDaemons(ctx, constraints)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/binpacking/packer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func BenchmarkPacker(b *testing.B) {

// Pack benchmark
for i := 0; i < b.N; i++ {
if packings, err := packer.Pack(ctx, schedule.Constraints, pods); err != nil || len(packings) == 0 {
if packings, err := packer.Pack(ctx, schedule.Constraints, pods, instanceTypes); err != nil || len(packings) == 0 {
b.FailNow()
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *Controller) Delete(name string) {
// Apply creates or updates the provisioner to the latest configuration
func (c *Controller) Apply(ctx context.Context, provisioner *v1alpha5.Provisioner) error {
// Refresh global requirements using instance type availability
instanceTypes, err := c.cloudProvider.GetInstanceTypes(ctx, &provisioner.Spec.Constraints)
instanceTypes, err := c.cloudProvider.GetInstanceTypes(ctx, provisioner.Spec.Provider)
if err != nil {
return err
}
Expand Down
22 changes: 14 additions & 8 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,25 @@ func (p *Provisioner) provision(ctx context.Context) (err error) {
if err != nil {
return fmt.Errorf("solving scheduling constraints, %w", err)
}
// Get instance type options
instanceTypes, err := p.cloudProvider.GetInstanceTypes(ctx, p.Spec.Provider)
if err != nil {
return fmt.Errorf("getting instance types, %w", err)
}
// Launch capacity and bind pods
for _, schedule := range schedules {
packings, err := p.packer.Pack(ctx, schedule.Constraints, schedule.Pods)
workqueue.ParallelizeUntil(ctx, len(schedules), len(schedules), func(i int) {
packings, err := p.packer.Pack(ctx, schedules[i].Constraints, schedules[i].Pods, instanceTypes)
if err != nil {
return fmt.Errorf("binpacking pods, %w", err)
logging.FromContext(ctx).Errorf("Could not pack pods, %s", err.Error())
return
}
for _, packing := range packings {
if err := p.launch(ctx, schedule.Constraints, packing); err != nil {
workqueue.ParallelizeUntil(ctx, len(packings), len(packings), func(j int) {
if err := p.launch(ctx, schedules[i].Constraints, packings[j]); err != nil {
logging.FromContext(ctx).Errorf("Could not launch node, %s", err.Error())
continue
return
}
}
}
})
})
return nil
}

Expand Down

0 comments on commit da85afd

Please sign in to comment.