Skip to content

Commit

Permalink
Removed cloudprovider awareness of binpacked pods (#651)
Browse files Browse the repository at this point in the history
  • Loading branch information
ellistarn authored Aug 31, 2021
1 parent 42c4bda commit 209d5ed
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 49 deletions.
11 changes: 6 additions & 5 deletions pkg/cloudprovider/aws/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,15 @@ func withUserAgent(sess *session.Session) *session.Session {
}

// Create a node given the constraints.
func (c *CloudProvider) Create(ctx context.Context, provisioner *v1alpha3.Provisioner, packing *cloudprovider.Packing, callback func(*v1.Node) error) chan error {

func (c *CloudProvider) Create(ctx context.Context, provisioner *v1alpha3.Provisioner, constraints *v1alpha3.Constraints, instanceTypes []cloudprovider.InstanceType, callback func(*v1.Node) error) chan error {
return c.creationQueue.Add(func() error {
return c.create(ctx, provisioner, packing, callback)
return c.create(ctx, provisioner, constraints, instanceTypes, callback)
})
}

func (c *CloudProvider) create(ctx context.Context, provisioner *v1alpha3.Provisioner, packing *cloudprovider.Packing, callback func(*v1.Node) error) error {
constraints := Constraints(*packing.Constraints)
func (c *CloudProvider) create(ctx context.Context, provisioner *v1alpha3.Provisioner, v1alpha3constraints *v1alpha3.Constraints, instanceTypes []cloudprovider.InstanceType, callback func(*v1.Node) error) error {
constraints := Constraints(*v1alpha3constraints)
// 1. Get Subnets and constrain by zones
subnets, err := c.subnetProvider.Get(ctx, provisioner, &constraints)
if err != nil {
Expand All @@ -138,7 +139,7 @@ func (c *CloudProvider) create(ctx context.Context, provisioner *v1alpha3.Provis
return fmt.Errorf("getting launch template, %w", err)
}
// 3. Create instance
node, err := c.instanceProvider.Create(ctx, launchTemplate, packing.InstanceTypeOptions, subnets, constraints.GetCapacityType())
node, err := c.instanceProvider.Create(ctx, launchTemplate, instanceTypes, subnets, constraints.GetCapacityType())
if err != nil {
return fmt.Errorf("launching instance, %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/awslabs/karpenter/pkg/cloudprovider/aws/fake"
"github.com/awslabs/karpenter/pkg/cloudprovider/registry"
"github.com/awslabs/karpenter/pkg/controllers/allocation"
"github.com/awslabs/karpenter/pkg/controllers/allocation/binpacking"
"github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling"
"github.com/awslabs/karpenter/pkg/packing"
"github.com/awslabs/karpenter/pkg/test"
. "github.com/awslabs/karpenter/pkg/test/expectations"
"github.com/awslabs/karpenter/pkg/utils/parallel"
Expand Down Expand Up @@ -82,7 +82,7 @@ var _ = BeforeSuite(func() {
Binder: &allocation.Binder{KubeClient: e.Client, CoreV1Client: clientSet.CoreV1()},
Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond),
Scheduler: scheduling.NewScheduler(cloudProvider, e.Client),
Packer: packing.NewPacker(),
Packer: binpacking.NewPacker(),
CloudProvider: cloudProvider,
KubeClient: e.Client,
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ import (

type CloudProvider struct{}

func (c *CloudProvider) Create(ctx context.Context, provisioner *v1alpha3.Provisioner, packing *cloudprovider.Packing, bind func(*v1.Node) error) chan error {
func (c *CloudProvider) Create(ctx context.Context, provisioner *v1alpha3.Provisioner, constraints *v1alpha3.Constraints, instanceTypes []cloudprovider.InstanceType, bind func(*v1.Node) error) chan error {
name := strings.ToLower(randomdata.SillyName())
// Pick first instance type option
instance := packing.InstanceTypeOptions[0]
instance := instanceTypes[0]
// Pick first zone
zones := instance.Zones()
if len(packing.Constraints.Zones) != 0 {
zones = functional.IntersectStringSlice(packing.Constraints.Zones, instance.Zones())
if len(constraints.Zones) != 0 {
zones = functional.IntersectStringSlice(constraints.Zones, instance.Zones())
}
zone := zones[0]

Expand All @@ -48,11 +48,11 @@ func (c *CloudProvider) Create(ctx context.Context, provisioner *v1alpha3.Provis
err <- bind(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: packing.Constraints.Labels,
Labels: constraints.Labels,
},
Spec: v1.NodeSpec{
ProviderID: fmt.Sprintf("fake:///%s/%s", name, zone),
Taints: packing.Constraints.Taints,
Taints: constraints.Taints,
},
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
Expand Down
19 changes: 1 addition & 18 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type CloudProvider interface {
// requests. The callback must be called with a theoretical node object that
// is fulfilled by the cloud providers capacity creation request. This API
// is called in parallel and then waits for all channels to return nil or error.
Create(context.Context, *v1alpha3.Provisioner, *Packing, func(*v1.Node) error) chan error
Create(context.Context, *v1alpha3.Provisioner, *v1alpha3.Constraints, []InstanceType, func(*v1.Node) error) chan error
// GetInstanceTypes returns the instance types supported by the cloud
// provider limited by the provided constraints and daemons.
GetInstanceTypes(context.Context) ([]InstanceType, error)
Expand All @@ -50,23 +50,6 @@ type CloudProvider interface {
Terminate(context.Context, *v1.Node) error
}

// Packing is a binpacking solution of equivalently schedulable pods to a set of
// viable instance types upon which they fit. All pods in the packing are
// within the specified constraints (e.g., labels, taints).
type Packing struct {
Pods []*v1.Pod
InstanceTypeOptions []InstanceType
Constraints *v1alpha3.Constraints
}

// PackedNode is a node object and the pods that should be bound to it. It is
// expected that the pods in a cloudprovider.Packing will be equivalent to the
// pods in a cloudprovider.PackedNode.
type PackedNode struct {
*v1.Node
Pods []*v1.Pod
}

// Options are injected into cloud providers' factories
type Options struct {
ClientSet *kubernetes.Clientset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package packing
package binpacking

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package packing
package binpacking

import (
"context"
"math"
"sort"

"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha3"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling"
"github.com/awslabs/karpenter/pkg/utils/apiobject"
Expand All @@ -38,26 +39,35 @@ type packer struct{}

// Packer helps pack the pods and calculates efficient placement on the instances.
type Packer interface {
Pack(context.Context, *scheduling.Schedule, []cloudprovider.InstanceType) []*cloudprovider.Packing
Pack(context.Context, *scheduling.Schedule, []cloudprovider.InstanceType) []*Packing
}

// NewPacker returns a Packer implementation
func NewPacker() Packer {
return &packer{}
}

// Packing is a binpacking solution of equivalently schedulable pods to a set of
// viable instance types upon which they fit. All pods in the packing are
// within the specified constraints (e.g., labels, taints).
type Packing struct {
Pods []*v1.Pod
InstanceTypeOptions []cloudprovider.InstanceType
Constraints *v1alpha3.Constraints
}

// Pack returns the node packings for the provided pods. It computes a set of viable
// instance types for each packing of pods. InstanceType variety enables the cloud provider
// to make better cost and availability decisions. The instance types returned are sorted by resources.
// Pods provided are all schedulable in the same zone as tightly as possible.
// It follows the First Fit Decreasing bin packing technique, reference-
// https://en.wikipedia.org/wiki/Bin_packing_problem#First_Fit_Decreasing_(FFD)
func (p *packer) Pack(ctx context.Context, schedule *scheduling.Schedule, instances []cloudprovider.InstanceType) []*cloudprovider.Packing {
func (p *packer) Pack(ctx context.Context, schedule *scheduling.Schedule, instances []cloudprovider.InstanceType) []*Packing {
// Sort pods in decreasing order by the amount of CPU requested, if
// CPU requested is equal compare memory requested.
sort.Sort(sort.Reverse(ByResourcesRequested{SortablePods: schedule.Pods}))
var packings []*cloudprovider.Packing
var packing *cloudprovider.Packing
var packings []*Packing
var packing *Packing
remainingPods := schedule.Pods
for len(remainingPods) > 0 {

Expand All @@ -77,7 +87,7 @@ func (p *packer) Pack(ctx context.Context, schedule *scheduling.Schedule, instan
// packWithLargestPod will try to pack max number of pods with largest pod in
// pods across all available node capacities. It returns Packing: max pod count
// that fit; with their node capacities and list of leftover pods
func (p *packer) packWithLargestPod(ctx context.Context, unpackedPods []*v1.Pod, schedule *scheduling.Schedule, instances []cloudprovider.InstanceType) (*cloudprovider.Packing, []*v1.Pod) {
func (p *packer) packWithLargestPod(ctx context.Context, unpackedPods []*v1.Pod, schedule *scheduling.Schedule, instances []cloudprovider.InstanceType) (*Packing, []*v1.Pod) {
bestPackedPods := []*v1.Pod{}
bestInstances := []cloudprovider.InstanceType{}
remainingPods := unpackedPods
Expand Down Expand Up @@ -105,7 +115,7 @@ func (p *packer) packWithLargestPod(ctx context.Context, unpackedPods []*v1.Pod,
if len(bestInstances) > MaxInstanceTypes {
bestInstances = bestInstances[:MaxInstanceTypes]
}
return &cloudprovider.Packing{Pods: bestPackedPods, Constraints: schedule.Constraints, InstanceTypeOptions: bestInstances}, remainingPods
return &Packing{Pods: bestPackedPods, Constraints: schedule.Constraints, InstanceTypeOptions: bestInstances}, remainingPods
}

func (*packer) podsMatch(first, second []*v1.Pod) bool {
Expand Down
11 changes: 5 additions & 6 deletions pkg/controllers/allocation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (

"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha3"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/controllers/allocation/binpacking"
"github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling"
"github.com/awslabs/karpenter/pkg/packing"
"github.com/awslabs/karpenter/pkg/utils/functional"
"github.com/awslabs/karpenter/pkg/utils/result"
"go.uber.org/multierr"
Expand Down Expand Up @@ -57,7 +57,7 @@ type Controller struct {
Filter *Filter
Binder *Binder
Scheduler *scheduling.Scheduler
Packer packing.Packer
Packer binpacking.Packer
CloudProvider cloudprovider.CloudProvider
KubeClient client.Client
}
Expand All @@ -69,7 +69,7 @@ func NewController(kubeClient client.Client, coreV1Client corev1.CoreV1Interface
Binder: &Binder{KubeClient: kubeClient, CoreV1Client: coreV1Client},
Batcher: NewBatcher(maxBatchWindow, batchIdleTimeout),
Scheduler: scheduling.NewScheduler(cloudProvider, kubeClient),
Packer: packing.NewPacker(),
Packer: binpacking.NewPacker(),
CloudProvider: cloudProvider,
KubeClient: kubeClient,
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
}

// 6. Binpack each group
packings := []*cloudprovider.Packing{}
packings := []*binpacking.Packing{}
for _, schedule := range schedules {
packings = append(packings, c.Packer.Pack(ctx, schedule, instanceTypes)...)
}
Expand All @@ -123,12 +123,11 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
errs := make([]error, len(packings))
workqueue.ParallelizeUntil(ctx, len(packings), len(packings), func(index int) {
packing := packings[index]
errs[index] = <-c.CloudProvider.Create(ctx, provisioner, packing, func(node *v1.Node) error {
errs[index] = <-c.CloudProvider.Create(ctx, provisioner, packing.Constraints, packing.InstanceTypeOptions, func(node *v1.Node) error {
node.Labels = functional.UnionStringMaps(
map[string]string{v1alpha3.ProvisionerNameLabelKey: provisioner.Name},
packing.Constraints.Labels,
)
node.Spec.Taints = packing.Constraints.Taints
return c.Binder.Bind(ctx, node, packing.Pods)
})
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/allocation/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/awslabs/karpenter/pkg/cloudprovider/fake"
"github.com/awslabs/karpenter/pkg/cloudprovider/registry"
"github.com/awslabs/karpenter/pkg/controllers/allocation"
"github.com/awslabs/karpenter/pkg/controllers/allocation/binpacking"
"github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling"
"github.com/awslabs/karpenter/pkg/packing"
"github.com/awslabs/karpenter/pkg/test"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -58,7 +58,7 @@ var _ = BeforeSuite(func() {
Binder: &allocation.Binder{KubeClient: e.Client, CoreV1Client: corev1.NewForConfigOrDie(e.Config)},
Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond),
Scheduler: scheduling.NewScheduler(cloudProvider, e.Client),
Packer: packing.NewPacker(),
Packer: binpacking.NewPacker(),
CloudProvider: cloudProvider,
KubeClient: e.Client,
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/allocation/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/awslabs/karpenter/pkg/cloudprovider/fake"
"github.com/awslabs/karpenter/pkg/cloudprovider/registry"
"github.com/awslabs/karpenter/pkg/controllers/allocation"
"github.com/awslabs/karpenter/pkg/controllers/allocation/binpacking"
"github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling"
"github.com/awslabs/karpenter/pkg/packing"
"github.com/awslabs/karpenter/pkg/test"
"knative.dev/pkg/ptr"

Expand Down Expand Up @@ -61,7 +61,7 @@ var _ = BeforeSuite(func() {
Binder: &allocation.Binder{KubeClient: e.Client, CoreV1Client: corev1.NewForConfigOrDie(e.Config)},
Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond),
Scheduler: scheduling.NewScheduler(cloudProvider, e.Client),
Packer: packing.NewPacker(),
Packer: binpacking.NewPacker(),
CloudProvider: cloudProvider,
KubeClient: e.Client,
}
Expand Down

0 comments on commit 209d5ed

Please sign in to comment.