Skip to content

Commit

Permalink
Implemented cloud provider initialization (aws#208)
Browse files Browse the repository at this point in the history
* Implemented Cloud Provider initialization and discovery logic.

* Implemented cloud provider initialization
  • Loading branch information
ellistarn committed Feb 4, 2021
1 parent ed9f1c5 commit 14342da
Show file tree
Hide file tree
Showing 10 changed files with 404 additions and 98 deletions.
8 changes: 3 additions & 5 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func main() {
Port: options.WebhookPort,
})

cloudProviderFactory := registry.NewFactory(cloudprovider.Options{Client: manager.GetClient()})
cloudProviderFactory := registry.NewFactory(cloudprovider.Options{Client: manager.GetClient(), Config: manager.GetConfig()})
metricsProducerFactory := &producers.Factory{Client: manager.GetClient(), CloudProviderFactory: cloudProviderFactory}
metricsClientFactory := metricsclients.NewFactoryOrDie(options.PrometheusURI)
autoscalerFactory := autoscaler.NewFactoryOrDie(metricsClientFactory, manager.GetRESTMapper(), manager.GetConfig())
Expand All @@ -74,10 +74,8 @@ func main() {
&scalablenodegroupv1alpha1.Controller{CloudProvider: cloudProviderFactory},
&metricsproducerv1alpha1.Controller{ProducerFactory: metricsProducerFactory},
&provisionerv1alpha1.Controller{
Client: manager.GetClient(),
Allocator: &allocation.GreedyAllocator{
Capacity: cloudProviderFactory.Capacity(),
},
Client: manager.GetClient(),
Allocator: &allocation.GreedyAllocator{Capacity: cloudProviderFactory.Capacity()},
},
).Start(controllerruntime.SetupSignalHandler()); err != nil {
zap.S().Panicf("Unable to start manager, %w", err)
Expand Down
13 changes: 13 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,16 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- configmaps
verbs:
- get
- update
- apiGroups:
- ""
resources:
- nodes
verbs:
- create
87 changes: 59 additions & 28 deletions docs/aws/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,54 @@ This command will create an IAM Policy with access to all of the resources for a
```
aws iam create-policy --policy-name Karpenter --policy-document "$(cat <<-EOM
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"eks:DescribeNodegroup",
"eks:UpdateNodegroupConfig"
],
"Effect": "Allow",
"Resource": "*"
},
{
"Action": [
"autoscaling:DescribeAutoScalingGroups",
"autoscaling:UpdateAutoScalingGroup"
],
"Effect": "Allow",
"Resource": "*"
},
{
"Action": [
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl"
],
"Effect": "Allow",
"Resource": "*"
}
]
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"eks:DescribeNodegroup",
"eks:UpdateNodegroupConfig"
],
"Effect": "Allow",
"Resource": "*"
},
{
"Action": [
"autoscaling:DescribeAutoScalingGroups",
"autoscaling:UpdateAutoScalingGroup"
],
"Effect": "Allow",
"Resource": "*"
},
{
"Action": [
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl"
],
"Effect": "Allow",
"Resource": "*"
},
{
"Action": [
"ec2:DescribeLaunchTemplates",
"ec2:CreateLaunchTemplate",
"ec2:CreateFleet",
"ec2:RunInstances",
"ec2:DescribeInstances",
"ec2:CreateTags",
"ec2:DescribeSubnets",
"eks:DescribeCluster",
"iam:GetRole",
"iam:CreateRole",
"iam:AddRoleToInstanceProfile",
"iam:PassRole",
"iam:GetInstanceProfile",
"iam:CreateInstanceProfile",
"iam:AttachRolePolicy"
],
"Effect": "Allow",
"Resource": "*"
}
]
}
EOM
)"
Expand Down Expand Up @@ -73,7 +94,17 @@ kubectl delete pods -n karpenter -l control-plane=karpenter
```

### Cleanup
```
```bash
eksctl delete iamserviceaccount --cluster ${CLUSTER_NAME} --name default --namespace karpenter
aws iam delete-policy --policy-arn arn:aws:iam::${AWS_ACCOUNT_ID}:policy/Karpenter

# Remove Karpenter generated resources
aws iam remove-role-from-instance-profile --instance-profile-name KarpenterNodeRole --role-name KarpenterNodeRole
aws iam delete-instance-profile --instance-profile-name KarpenterNodeRole
aws iam detach-role-policy --role-name KarpenterNodeRole --policy-arn arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore
aws iam detach-role-policy --role-name KarpenterNodeRole --policy-arn arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy
aws iam detach-role-policy --role-name KarpenterNodeRole --policy-arn arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy
aws iam detach-role-policy --role-name KarpenterNodeRole --policy-arn arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly
aws iam delete-role --role-name KarpenterNodeRole
aws ec2 describe-launch-templates | jq -r ".LaunchTemplates[].LaunchTemplateName" | grep KarpenterLaunchTemplate | xargs -I{} aws ec2 delete-launch-template --launch-template-name {}
```
118 changes: 67 additions & 51 deletions pkg/cloudprovider/aws/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,84 +17,100 @@ package aws
import (
"context"
"fmt"
"strconv"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/eks/eksiface"
"github.com/aws/aws-sdk-go/service/iam/iamiface"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/utils/log"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type Capacity struct {
ec2Iface ec2iface.EC2API
Client client.Client
EC2API ec2iface.EC2API
LaunchTemplate *ec2.LaunchTemplate
ZonalSubnets map[string]*ec2.Subnet
}

// NewCapacity constructs a Capacity client for AWS
func NewCapacity(client ec2iface.EC2API) *Capacity {
return &Capacity{ec2Iface: client}
func NewCapacity(EC2API ec2iface.EC2API, EKSAPI eksiface.EKSAPI, IAMAPI iamiface.IAMAPI, client client.Client) *Capacity {
initialization := NewInitialization(EC2API, EKSAPI, IAMAPI, client)
return &Capacity{
EC2API: EC2API,
LaunchTemplate: initialization.LaunchTemplate,
ZonalSubnets: initialization.ZonalSubnets,
Client: client,
}
}

// Create a set of nodes given the constraints
func (cp *Capacity) Create(ctx context.Context, constraints *cloudprovider.CapacityConstraints) error {
// TODO Convert contraints to the Node types and select the launch template
func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.CapacityConstraints) ([]*v1.Node, error) {
// TODO, select a zone more intelligently
var zone string
for zone = range c.ZonalSubnets {
}

// Create the desired number of instances based on constraints
// create instances using EC2 fleet API
// TODO remove hard coded values
output, err := cp.ec2Iface.CreateFleetWithContext(ctx, &ec2.CreateFleetInput{
LaunchTemplateConfigs: []*ec2.FleetLaunchTemplateConfigRequest{
{
LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecificationRequest{
LaunchTemplateId: aws.String("lt-02f427483e1be00f5"),
Version: aws.String("$Latest"),
},
Overrides: []*ec2.FleetLaunchTemplateOverridesRequest{
{
InstanceType: aws.String("m5.large"),
SubnetId: aws.String("subnet-03216d5a693377033"),
AvailabilityZone: aws.String("us-east-2a"),
},
},
},
},
createFleetOutput, err := c.EC2API.CreateFleetWithContext(context.TODO(), &ec2.CreateFleetInput{
Type: aws.String(ec2.FleetTypeInstant),
TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{
DefaultTargetCapacityType: aws.String(ec2.DefaultTargetCapacityTypeOnDemand),
OnDemandTargetCapacity: aws.Int64(1),
TotalTargetCapacity: aws.Int64(1),
},
Type: aws.String(ec2.FleetTypeInstant),
LaunchTemplateConfigs: []*ec2.FleetLaunchTemplateConfigRequest{{
LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecificationRequest{
LaunchTemplateName: c.LaunchTemplate.LaunchTemplateName,
Version: aws.String("$Default"),
},
Overrides: []*ec2.FleetLaunchTemplateOverridesRequest{{
AvailabilityZone: aws.String(zone),
InstanceType: aws.String("m5.large"),
SubnetId: c.ZonalSubnets[zone].SubnetId,
}},
}},
})
if err != nil {
return fmt.Errorf("failed to create fleet %w", err)
return nil, fmt.Errorf("creating fleet, %w", err)
}
// TODO Get instanceID from the output
_ = output
// _ = cfg.instanceID
zap.S().Infof("Successfully created a node in zone %v", constraints.Zone)
return nil
}

// calculateResourceListOrDie queries EC2 API and gets the CPU & Mem for a list of instance types
func calculateResourceListOrDie(client ec2iface.EC2API, instanceType []*string) map[string]v1.ResourceList {
output, err := client.DescribeInstanceTypes(
&ec2.DescribeInstanceTypesInput{
InstanceTypes: instanceType,
},
)
var nodes []*v1.Node
var instanceIds []*string
for _, instance := range createFleetOutput.Instances {
instanceIds = append(instanceIds, instance.InstanceIds...)
}

// TODO, add retries to describe instances, since create fleet is eventually consistent.
describeInstancesOutput, err := c.EC2API.DescribeInstances(&ec2.DescribeInstancesInput{InstanceIds: instanceIds})
if err != nil {
log.PanicIfError(err, "Describe instance type request failed")
return nil, fmt.Errorf("describing instances %v, %w", instanceIds, err)
}
var instanceTypes = map[string]v1.ResourceList{}
for _, instance := range output.InstanceTypes {
resourceList := v1.ResourceList{
v1.ResourceCPU: resource.MustParse(strconv.FormatInt(*instance.VCpuInfo.DefaultVCpus, 10)),
v1.ResourceMemory: resource.MustParse(strconv.FormatInt(*instance.MemoryInfo.SizeInMiB, 10)),

for _, reservation := range describeInstancesOutput.Reservations {
for _, instance := range reservation.Instances {
nodes = append(nodes, nodeFrom(instance))
}
instanceTypes[*instance.InstanceType] = resourceList
}
return instanceTypes

return nodes, nil
}

func nodeFrom(instance *ec2.Instance) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: *instance.PrivateDnsName,
},
Spec: v1.NodeSpec{
ProviderID: fmt.Sprintf("aws:///%s/%s", *instance.Placement.AvailabilityZone, *instance.InstanceId),
},
Status: v1.NodeStatus{
Allocatable: v1.ResourceList{
// TODO, This value is necessary to avoid OutOfPods failure state. Find a way to set this (and cpu/mem) correctly
v1.ResourcePods: resource.MustParse("100"),
},
},
}
}
29 changes: 20 additions & 9 deletions pkg/cloudprovider/aws/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,40 @@ import (
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/eks"
"github.com/aws/aws-sdk-go/service/eks/eksiface"
"github.com/aws/aws-sdk-go/service/iam"
"github.com/aws/aws-sdk-go/service/iam/iamiface"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/awslabs/karpenter/pkg/apis/autoscaling/v1alpha1"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/cloudprovider/fake"
"github.com/awslabs/karpenter/pkg/utils/log"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type Factory struct {
AutoscalingClient autoscalingiface.AutoScalingAPI
SQSClient sqsiface.SQSAPI
EKSClient eksiface.EKSAPI
EC2Client ec2iface.EC2API
SQS sqsiface.SQSAPI
EKS eksiface.EKSAPI
EC2 ec2iface.EC2API
IAM iamiface.IAMAPI
Client client.Client
// TODO, dedup this with client. Currently necessary due to the client not
// working until mgr.Start() is called, which breaks initialization logic.
Config *rest.Config
}

func NewFactory(options cloudprovider.Options) *Factory {
sess := withRegion(session.Must(session.NewSession()))
return &Factory{
AutoscalingClient: autoscaling.New(sess),
EKSClient: eks.New(sess),
SQSClient: sqs.New(sess),
EC2Client: ec2.New(sess),
EKS: eks.New(sess),
SQS: sqs.New(sess),
EC2: ec2.New(sess),
IAM: iam.New(sess),
Client: options.Client,
Config: options.Config,
}
}

Expand All @@ -57,7 +66,7 @@ func (f *Factory) NodeGroupFor(spec *v1alpha1.ScalableNodeGroupSpec) cloudprovid
case v1alpha1.AWSEC2AutoScalingGroup:
return NewAutoScalingGroup(spec.ID, f.AutoscalingClient)
case v1alpha1.AWSEKSNodeGroup:
return NewManagedNodeGroup(spec.ID, f.EKSClient, f.AutoscalingClient, f.Client)
return NewManagedNodeGroup(spec.ID, f.EKS, f.AutoscalingClient, f.Client)
default:
return fake.NewNotImplementedFactory().NodeGroupFor(spec)
}
Expand All @@ -66,14 +75,16 @@ func (f *Factory) NodeGroupFor(spec *v1alpha1.ScalableNodeGroupSpec) cloudprovid
func (f *Factory) QueueFor(spec *v1alpha1.QueueSpec) cloudprovider.Queue {
switch spec.Type {
case v1alpha1.AWSSQSQueueType:
return NewSQSQueue(spec.ID, f.SQSClient)
return NewSQSQueue(spec.ID, f.SQS)
default:
return fake.NewNotImplementedFactory().QueueFor(spec)
}
}

func (f *Factory) Capacity() cloudprovider.Capacity {
return NewCapacity(f.EC2Client)
kubeClient, err := client.New(f.Config, client.Options{})
log.PanicIfError(err, "Failed to instantiate kubeClient")
return NewCapacity(f.EC2, f.EKS, f.IAM, kubeClient)
}

func withRegion(sess *session.Session) *session.Session {
Expand Down
Loading

0 comments on commit 14342da

Please sign in to comment.