Skip to content

Commit

Permalink
Add end to end functionality for pending pods (#209)
Browse files Browse the repository at this point in the history
* add end to end functionality

* simplify functions after PR feedback

* simplify create fleet method

* fix comments, remove new line char and remove private create method
  • Loading branch information
prateekgogia authored Jan 28, 2021
1 parent 58896c9 commit 48f5646
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 61 deletions.
2 changes: 1 addition & 1 deletion docs/examples/provisioner/provisioner.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: autoscaling.karpenter.sh/v1alpha1
apiVersion: provisioning.karpenter.sh/v1alpha1
kind: Provisioner
metadata:
name: example
Expand Down
102 changes: 54 additions & 48 deletions pkg/cloudprovider/aws/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@ package aws

import (
"context"
"fmt"
"strconv"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/utils/log"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)

type Capacity struct {
Expand All @@ -33,62 +39,62 @@ func NewCapacity(client ec2iface.EC2API) *Capacity {
}

// Create a set of nodes given the constraints
func (cp *Capacity) Create(ctx context.Context, constraints cloudprovider.CapacityConstraints) error {
// Convert contraints to the Node types and select the launch template
// TODO
func (cp *Capacity) Create(ctx context.Context, constraints *cloudprovider.CapacityConstraints) error {
// TODO Convert contraints to the Node types and select the launch template

// Create the desired number of instances based on desired capacity
config := defaultInstanceConfig("", "", cp.ec2Iface)
_ = config

// Set AvailabilityZone, subnet, capacity, on-demand or spot
// and validateAndCreate instances
if err := config.validateAndCreate(ctx); err != nil {
return err
}
return nil
}

type instanceConfig struct {
ec2Iface ec2iface.EC2API
templateConfig *ec2.FleetLaunchTemplateConfigRequest
capacitySpec *ec2.TargetCapacitySpecificationRequest
instanceID string
}

func defaultInstanceConfig(templateID, templateVersion string, client ec2iface.EC2API) *instanceConfig {
return &instanceConfig{
ec2Iface: client,
templateConfig: &ec2.FleetLaunchTemplateConfigRequest{
LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecificationRequest{
LaunchTemplateId: aws.String(templateID),
Version: aws.String(templateVersion),
},
Overrides: []*ec2.FleetLaunchTemplateOverridesRequest{
&ec2.FleetLaunchTemplateOverridesRequest{},
// Create the desired number of instances based on constraints
// create instances using EC2 fleet API
// TODO remove hard coded values
output, err := cp.ec2Iface.CreateFleetWithContext(ctx, &ec2.CreateFleetInput{
LaunchTemplateConfigs: []*ec2.FleetLaunchTemplateConfigRequest{
{
LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecificationRequest{
LaunchTemplateId: aws.String("lt-02f427483e1be00f5"),
Version: aws.String("$Latest"),
},
Overrides: []*ec2.FleetLaunchTemplateOverridesRequest{
{
InstanceType: aws.String("m5.large"),
SubnetId: aws.String("subnet-03216d5a693377033"),
AvailabilityZone: aws.String("us-east-2a"),
},
},
},
},
capacitySpec: &ec2.TargetCapacitySpecificationRequest{
TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{
DefaultTargetCapacityType: aws.String(ec2.DefaultTargetCapacityTypeOnDemand),
OnDemandTargetCapacity: aws.Int64(1),
TotalTargetCapacity: aws.Int64(1),
},
}
}

func (cfg *instanceConfig) validateAndCreate(ctx context.Context) error {
input := &ec2.CreateFleetInput{
LaunchTemplateConfigs: []*ec2.FleetLaunchTemplateConfigRequest{cfg.templateConfig},
TargetCapacitySpecification: cfg.capacitySpec,
Type: aws.String(ec2.FleetTypeInstant),
}
if err := input.Validate(); err != nil {
return err
}
output, err := cfg.ec2Iface.CreateFleetWithContext(ctx, input)
Type: aws.String(ec2.FleetTypeInstant),
})
if err != nil {
return err
return fmt.Errorf("failed to create fleet %w", err)
}
// TODO Get instanceID from the output
_ = output
_ = cfg.instanceID
// _ = cfg.instanceID
zap.S().Infof("Successfully created a node in zone %v", constraints.Zone)
return nil
}

// calculateResourceListOrDie queries EC2 API and gets the CPU & Mem for a list of instance types
func calculateResourceListOrDie(client ec2iface.EC2API, instanceType []*string) map[string]v1.ResourceList {
output, err := client.DescribeInstanceTypes(
&ec2.DescribeInstanceTypesInput{
InstanceTypes: instanceType,
},
)
if err != nil {
log.PanicIfError(err, "Describe instance type request failed")
}
var instanceTypes = map[string]v1.ResourceList{}
for _, instance := range output.InstanceTypes {
resourceList := v1.ResourceList{
v1.ResourceCPU: resource.MustParse(strconv.FormatInt(*instance.VCpuInfo.DefaultVCpus, 10)),
v1.ResourceMemory: resource.MustParse(strconv.FormatInt(*instance.MemoryInfo.SizeInMiB, 10)),
}
instanceTypes[*instance.InstanceType] = resourceList
}
return instanceTypes
}
2 changes: 1 addition & 1 deletion pkg/cloudprovider/fake/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ import (
type Capacity struct {
}

func (c *Capacity) Create(context.Context, cloudprovider.CapacityConstraints) error {
func (c *Capacity) Create(context.Context, *cloudprovider.CapacityConstraints) error {
return nil
}
4 changes: 2 additions & 2 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type NodeGroup interface {
// Capacity provisions a set of nodes that fulfill a set of constraints.
type Capacity interface {
// Create a set of nodes to fulfill the desired capacity given constraints.
Create(context.Context, CapacityConstraints) error
Create(context.Context, *CapacityConstraints) error
}

// CapacityConstraints lets the controller define the desired capacity,
Expand All @@ -70,7 +70,7 @@ type CapacityConstraints struct {
// Overhead resources per node from system resources such a kubelet and daemonsets.
Overhead v1.ResourceList
// Architecture constrains the underlying hardware architecture.
Architecture *Architecture
Architecture Architecture
}

// Architecture constrains the underlying node's compilation architecture.
Expand Down
69 changes: 60 additions & 9 deletions pkg/controllers/provisioner/v1alpha1/allocation/greedyallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"fmt"

"github.com/awslabs/karpenter/pkg/cloudprovider"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)

var _ Allocator = &GreedyAllocator{}
Expand All @@ -29,11 +31,13 @@ type GreedyAllocator struct {
Capacity cloudprovider.Capacity
}

//
// Allocate takes a list of unschedulable pods and creates nodes based on
// resources required, node selectors and zone balancing.
func (a *GreedyAllocator) Allocate(pods []*v1.Pod) error {
// 1. Separate pods into scheduling groups
groups := a.getSchedulingGroups(pods)

zap.S().Infof("Allocating %d pending pods from %d constraint groups", len(pods), len(groups))
// 2. Group pods into equally schedulable constraint group
for _, group := range groups {
if err := a.Capacity.Create(context.TODO(), group.Constraints); err != nil {
Expand All @@ -45,25 +49,72 @@ func (a *GreedyAllocator) Allocate(pods []*v1.Pod) error {

type SchedulingGroup struct {
Pods []*v1.Pod
Constraints cloudprovider.CapacityConstraints
Constraints *cloudprovider.CapacityConstraints
}

func (a *GreedyAllocator) getSchedulingGroups(pods []*v1.Pod) []SchedulingGroup {
groups := []SchedulingGroup{}

func (a *GreedyAllocator) getSchedulingGroups(pods []*v1.Pod) []*SchedulingGroup {
groups := []*SchedulingGroup{}
for _, pod := range pods {
added := false
for _, group := range groups {
if a.matchesGroup(pod, group) {
if a.matchesGroup(group, pod) {
addPodResourcesToList(group.Constraints.Resources, pod)
group.Pods = append(group.Pods, pod)
added = true
break
}
}
if added {
continue
}
groups = append(groups, schedulingGroupForPod(pod))
}

return groups
}

// TODO
func (a *GreedyAllocator) matchesGroup(pod *v1.Pod, group SchedulingGroup) bool {
return true
func (a *GreedyAllocator) matchesGroup(group *SchedulingGroup, pod *v1.Pod) bool {
return false
}

func schedulingGroupForPod(pod *v1.Pod) *SchedulingGroup {
group := &SchedulingGroup{
Constraints: &cloudprovider.CapacityConstraints{
Resources: calculateResourcesForPod(pod),
Overhead: calculateOverheadResources(),
Architecture: getSystemArchitecture(pod),
Zone: getAvalabiltyZoneForPod(pod),
},
}
return group
}

func calculateResourcesForPod(pod *v1.Pod) v1.ResourceList {
resourceList := v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0"),
v1.ResourceMemory: resource.MustParse("0"),
}
addPodResourcesToList(resourceList, pod)
return resourceList
}

func addPodResourcesToList(resources v1.ResourceList, pod *v1.Pod) {
for _, container := range pod.Spec.Containers {
resources.Cpu().Add(*container.Resources.Limits.Cpu())
resources.Memory().Add(*container.Resources.Limits.Memory())
}
}

func calculateOverheadResources() v1.ResourceList {
//TODO
return v1.ResourceList{}
}

func getSystemArchitecture(pod *v1.Pod) cloudprovider.Architecture {
return cloudprovider.Linux386
}

func getAvalabiltyZoneForPod(pod *v1.Pod) string {
// TODO parse annotation/label from pod
return "us-east-2b"
}

0 comments on commit 48f5646

Please sign in to comment.