Skip to content

Commit

Permalink
Experimental support for volume topology aware scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
ellistarn committed Dec 30, 2021
1 parent 77017ac commit cec6215
Show file tree
Hide file tree
Showing 15 changed files with 392 additions and 97 deletions.
8 changes: 7 additions & 1 deletion charts/karpenter/templates/controller/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ rules:
- apiGroups: ["karpenter.sh"]
resources: ["provisioners", "provisioners/status"]
verbs: ["create", "delete", "patch", "get", "list", "watch"]
- apiGroups: [""]
resources: ["persistentvolumes", "persistentvolumeclaims"]
verbs: ["get", "list", "watch", "update"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["get", "list", "watch"]
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["create", "get", "patch", "update", "watch"]
Expand All @@ -67,4 +73,4 @@ rules:
- apiGroups: ["apps"]
resources: ["daemonsets"]
verbs: ["list", "watch"]
---
---
4 changes: 2 additions & 2 deletions pkg/apis/provisioning/v1alpha5/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (c *Constraints) ValidatePod(pod *v1.Pod) error {
}
}
// The combined requirements are not compatible
combined := c.Requirements.With(podRequirements)
combined := c.Requirements.Add(podRequirements...)
for _, key := range podRequirements.Keys() {
if combined.Requirement(key).Len() == 0 {
return fmt.Errorf("invalid nodeSelector %q, %v not in %v", key, podRequirements.Requirement(key).UnsortedList(), c.Requirements.Requirement(key).UnsortedList())
Expand All @@ -68,7 +68,7 @@ func (c *Constraints) ValidatePod(pod *v1.Pod) error {
func (c *Constraints) Tighten(pod *v1.Pod) *Constraints {
return &Constraints{
Labels: c.Labels,
Requirements: c.Requirements.With(PodRequirements(pod)).Consolidate().WellKnown(),
Requirements: c.Requirements.Add(PodRequirements(pod)...).Consolidate().WellKnown(),
Taints: c.Taints,
Provider: c.Provider,
KubeletConfiguration: c.KubeletConfiguration,
Expand Down
53 changes: 5 additions & 48 deletions pkg/apis/provisioning/v1alpha5/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,61 +17,13 @@ package v1alpha5
import (
"context"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/apis"
)

var (
ArchitectureAmd64 = "amd64"
ArchitectureArm64 = "arm64"
OperatingSystemLinux = "linux"

ProvisionerNameLabelKey = SchemeGroupVersion.Group + "/provisioner-name"
NotReadyTaintKey = SchemeGroupVersion.Group + "/not-ready"
DoNotEvictPodAnnotationKey = SchemeGroupVersion.Group + "/do-not-evict"
EmptinessTimestampAnnotationKey = SchemeGroupVersion.Group + "/emptiness-timestamp"
TerminationFinalizer = SchemeGroupVersion.Group + "/termination"
DefaultProvisioner = types.NamespacedName{Name: "default"}
)

var (
// RestrictedLabels are injected by Cloud Providers
RestrictedLabels = sets.NewString(
// Used internally by provisioning logic
EmptinessTimestampAnnotationKey,
v1.LabelHostname,
)

// AllowedLabelDomains are domains that may be restricted, but that is allowed because
// they are not used in a context where they may be passed as argument to kubelet.
// AllowedLabelDomains are evaluated before RestrictedLabelDomains
AllowedLabelDomains = sets.NewString(
"kops.k8s.io",
)

// These are either prohibited by the kubelet or reserved by karpenter
// They are evaluated after AllowedLabelDomains
KarpenterLabelDomain = "karpenter.sh"
RestrictedLabelDomains = sets.NewString(
"kubernetes.io",
"k8s.io",
KarpenterLabelDomain,
)
LabelCapacityType = KarpenterLabelDomain + "/capacity-type"
// WellKnownLabels supported by karpenter
WellKnownLabels = sets.NewString(
v1.LabelTopologyZone,
v1.LabelInstanceTypeStable,
v1.LabelArchStable,
v1.LabelOSStable,
LabelCapacityType,
v1.LabelHostname, // Used internally for hostname topology spread
)
DefaultHook = func(ctx context.Context, constraints *Constraints) {}
ValidateHook = func(ctx context.Context, constraints *Constraints) *apis.FieldError { return nil }
)
Expand All @@ -88,6 +40,11 @@ var (
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
})
ProvisionerNameLabelKey = SchemeGroupVersion.Group + "/provisioner-name"
NotReadyTaintKey = SchemeGroupVersion.Group + "/not-ready"
DoNotEvictPodAnnotationKey = SchemeGroupVersion.Group + "/do-not-evict"
EmptinessTimestampAnnotationKey = SchemeGroupVersion.Group + "/emptiness-timestamp"
TerminationFinalizer = SchemeGroupVersion.Group + "/termination"
)

const (
Expand Down
111 changes: 87 additions & 24 deletions pkg/apis/provisioning/v1alpha5/requirements.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,55 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
)

var (
ArchitectureAmd64 = "amd64"
ArchitectureArm64 = "arm64"
OperatingSystemLinux = "linux"

// RestrictedLabels are injected by Cloud Providers
RestrictedLabels = sets.NewString(
// Used internally by provisioning logic
EmptinessTimestampAnnotationKey,
v1.LabelHostname,
)

// AllowedLabelDomains are domains that may be restricted, but that is allowed because
// they are not used in a context where they may be passed as argument to kubelet.
// AllowedLabelDomains are evaluated before RestrictedLabelDomains
AllowedLabelDomains = sets.NewString(
"kops.k8s.io",
)

// These are either prohibited by the kubelet or reserved by karpenter
// They are evaluated after AllowedLabelDomains
KarpenterLabelDomain = "karpenter.sh"
RestrictedLabelDomains = sets.NewString(
"kubernetes.io",
"k8s.io",
KarpenterLabelDomain,
)
LabelCapacityType = KarpenterLabelDomain + "/capacity-type"
// WellKnownLabels supported by karpenter
WellKnownLabels = sets.NewString(
v1.LabelTopologyZone,
v1.LabelInstanceTypeStable,
v1.LabelArchStable,
v1.LabelOSStable,
LabelCapacityType,
v1.LabelHostname, // Used internally for hostname topology spread
)
// NormalizedLabels translate aliased concepts into the controller's
// WellKnownLabels. Pod requirements are translated for compatibility,
// however, Provisioner labels are still restricted to WellKnownLabels.
// Additional labels may be injected by cloud providers.
NormalizedLabels = map[string]string{
v1.LabelFailureDomainBetaZone: v1.LabelTopologyZone,
"beta.kubernetes.io/arch": v1.LabelArchStable,
"beta.kubernetes.io/os": v1.LabelOSStable,
v1.LabelInstanceType: v1.LabelInstanceTypeStable,
}
)

// Requirements is a decorated alias type for []v1.NodeSelectorRequirements
type Requirements []v1.NodeSelectorRequirement

Expand All @@ -44,58 +93,72 @@ func (r Requirements) CapacityTypes() sets.String {
return r.Requirement(LabelCapacityType)
}

func (r Requirements) With(requirements Requirements) Requirements {
return append(r, requirements...)
func (r Requirements) Add(requirements ...v1.NodeSelectorRequirement) Requirements {
return append(r, Requirements(requirements).Normalize()...)
}

// Normalize the requirements to use WellKnownLabels
func (r Requirements) Normalize() Requirements {
normalized := Requirements{}
for _, requirement := range r {
label := requirement.Key
if normalized, ok := NormalizedLabels[requirement.Key]; ok {
label = normalized
}
normalized = append(normalized, v1.NodeSelectorRequirement{Key: label, Operator: requirement.Operator, Values: requirement.Values})
}
return normalized
}

// Consolidate combines In and NotIn requirements for each unique key, producing
// an equivalent minimal representation of the requirements. This is useful as
// requirements may be appended from a variety of sources and then consolidated.
// Caution: If a key has contains a `NotIn` operator without a corresponding
// `In` operator, the requirement will permanently be [] after consolidation. To
// avoid this, include the broadest `In` requirements before consolidating.
func (r Requirements) Consolidate() (requirements Requirements) {
for _, key := range r.Keys() {
requirements = requirements.Add(v1.NodeSelectorRequirement{
Key: key,
Operator: v1.NodeSelectorOpIn,
Values: r.Requirement(key).UnsortedList(),
})
}
return requirements
}

func LabelRequirements(labels map[string]string) (r Requirements) {
for key, value := range labels {
r = append(r, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}})
r = r.Add(v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}})
}
return r
}

func PodRequirements(pod *v1.Pod) (r Requirements) {
for key, value := range pod.Spec.NodeSelector {
r = append(r, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}})
r = r.Add(v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}})
}
if pod.Spec.Affinity == nil || pod.Spec.Affinity.NodeAffinity == nil {
return r
}
// Select heaviest preference and treat as a requirement. An outer loop will iteratively unconstrain them if unsatisfiable.
if preferred := pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution; len(preferred) > 0 {
sort.Slice(preferred, func(i int, j int) bool { return preferred[i].Weight > preferred[j].Weight })
r = append(r, preferred[0].Preference.MatchExpressions...)
r = r.Add(preferred[0].Preference.MatchExpressions...)
}
// Select first requirement. An outer loop will iteratively remove OR requirements if unsatisfiable
if pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil &&
len(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) > 0 {
r = append(r, pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions...)
r = r.Add(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions...)
}
return r
}

// Consolidate combines In and NotIn requirements for each unique key, producing
// an equivalent minimal representation of the requirements. This is useful as
// requirements may be appended from a variety of sources and then consolidated.
// Caution: If a key has contains a `NotIn` operator without a corresponding
// `In` operator, the requirement will permanently be [] after consolidation. To
// avoid this, include the broadest `In` requirements before consolidating.
func (r Requirements) Consolidate() (requirements Requirements) {
for _, key := range r.Keys() {
requirements = append(requirements, v1.NodeSelectorRequirement{
Key: key,
Operator: v1.NodeSelectorOpIn,
Values: r.Requirement(key).UnsortedList(),
})
}
return requirements
}

func (r Requirements) WellKnown() (requirements Requirements) {
for _, requirement := range r {
if WellKnownLabels.Has(requirement.Key) {
requirements = append(requirements, requirement)
requirements = requirements.Add(requirement)

}
}
return requirements
Expand Down
5 changes: 5 additions & 0 deletions pkg/cloudprovider/aws/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider"
"github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1"
"github.com/aws/karpenter/pkg/utils/functional"
"github.com/aws/karpenter/pkg/utils/project"

"go.uber.org/multierr"
Expand All @@ -54,6 +55,10 @@ const (
CacheCleanupInterval = 10 * time.Minute
)

func init() {
v1alpha5.NormalizedLabels = functional.UnionStringMaps(v1alpha5.NormalizedLabels, map[string]string{"topology.ebs.csi.aws.com/zone": v1.LabelTopologyZone})
}

type CloudProvider struct {
instanceTypeProvider *InstanceTypeProvider
subnetProvider *SubnetProvider
Expand Down
3 changes: 2 additions & 1 deletion pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/base64"
"encoding/json"
"strings"
"testing"

"github.com/Pallinder/go-randomdata"
Expand Down Expand Up @@ -114,7 +115,7 @@ var _ = Describe("Allocation", func() {
provider = &v1alpha1.AWS{
InstanceProfile: "test-instance-profile",
}
provisioner = ProvisionerWithProvider(&v1alpha5.Provisioner{ObjectMeta: metav1.ObjectMeta{Name: v1alpha5.DefaultProvisioner.Name}}, provider)
provisioner = ProvisionerWithProvider(&v1alpha5.Provisioner{ObjectMeta: metav1.ObjectMeta{Name: strings.ToLower(randomdata.SillyName())}}, provider)
provisioner.SetDefaults(ctx)
fakeEC2API.Reset()
launchTemplateCache.Flush()
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/node/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var _ = Describe("Controller", func() {
var provisioner *v1alpha5.Provisioner
BeforeEach(func() {
provisioner = &v1alpha5.Provisioner{
ObjectMeta: metav1.ObjectMeta{Name: v1alpha5.DefaultProvisioner.Name},
ObjectMeta: metav1.ObjectMeta{Name: strings.ToLower(randomdata.SillyName())},
Spec: v1alpha5.ProvisionerSpec{},
}
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func (c *Controller) Apply(ctx context.Context, provisioner *v1alpha5.Provisione
}
provisioner.Spec.Labels = functional.UnionStringMaps(provisioner.Spec.Labels, map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name})
provisioner.Spec.Requirements = provisioner.Spec.Requirements.
With(requirements(instanceTypes)).
With(v1alpha5.LabelRequirements(provisioner.Spec.Labels)).
Add(requirements(instanceTypes)...).
Add(v1alpha5.LabelRequirements(provisioner.Spec.Labels)...).
Consolidate()
// Update the provisioner if anything has changed
if c.hasChanged(ctx, provisioner) {
Expand Down
30 changes: 24 additions & 6 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ func (p *Provisioner) launch(ctx context.Context, constraints *v1alpha5.Constrai
return p.cloudProvider.Create(ctx, constraints, packing.InstanceTypeOptions, packing.NodeQuantity, func(node *v1.Node) error {
node.Labels = functional.UnionStringMaps(node.Labels, constraints.Labels)
node.Spec.Taints = append(node.Spec.Taints, constraints.Taints...)
return p.bind(ctx, node, <-pods)
return p.create(ctx, node, <-pods)
})
}

func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err error) {
func (p *Provisioner) create(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err error) {
defer metrics.Measure(bindTimeHistogram.WithLabelValues(injection.GetNamespacedName(ctx).Name))()

// Add the Karpenter finalizer to the node to enable the termination workflow
Expand Down Expand Up @@ -237,10 +237,8 @@ func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (
// Bind pods
var bound int64
workqueue.ParallelizeUntil(ctx, len(pods), len(pods), func(i int) {
pod := pods[i]
binding := &v1.Binding{TypeMeta: pod.TypeMeta, ObjectMeta: pod.ObjectMeta, Target: v1.ObjectReference{Name: node.Name}}
if err := p.coreV1Client.Pods(pods[i].Namespace).Bind(ctx, binding, metav1.CreateOptions{}); err != nil {
logging.FromContext(ctx).Errorf("Failed to bind %s/%s to %s, %s", pod.Namespace, pod.Name, node.Name, err.Error())
if err := p.bind(ctx, node, pods[i]); err != nil {
logging.FromContext(ctx).Errorf("Failed to bind %s/%s to %s, %s", pods[i].Namespace, pods[i].Name, node.Name, err.Error())
} else {
atomic.AddInt64(&bound, 1)
}
Expand All @@ -249,6 +247,26 @@ func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (
return nil
}

func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pod *v1.Pod) error {
if err := p.coreV1Client.Pods(pod.Namespace).Bind(ctx, &v1.Binding{TypeMeta: pod.TypeMeta, ObjectMeta: pod.ObjectMeta, Target: v1.ObjectReference{Name: node.Name}}, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("binding pod %s/%s, %w", pod.Name, pod.Namespace, err)
}
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}
pvc := &v1.PersistentVolumeClaim{}
if err := p.kubeClient.Get(ctx, types.NamespacedName{Name: volume.PersistentVolumeClaim.ClaimName, Namespace: pod.Namespace}, pvc); err != nil {
return fmt.Errorf("getting persistent volume claim %s, %w", volume.PersistentVolumeClaim.ClaimName, err)
}
pvc.Annotations["volume.kubernetes.io/selected-node"] = node.Name
if err := p.kubeClient.Update(ctx, pvc); err != nil {
return fmt.Errorf("binding persistent volume claim %s/%s to node %q, %w", pvc.Namespace, pvc.Name, node.Name, err)
}
}
return nil
}

var bindTimeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ var _ = AfterSuite(func() {

var _ = BeforeEach(func() {
provisioner = &v1alpha5.Provisioner{
ObjectMeta: metav1.ObjectMeta{Name: v1alpha5.DefaultProvisioner.Name},
ObjectMeta: metav1.ObjectMeta{Name: strings.ToLower(randomdata.SillyName())},
Spec: v1alpha5.ProvisionerSpec{},
}
provisioner.SetDefaults(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/scheduling/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (t *Topology) Inject(ctx context.Context, constraints *v1alpha5.Constraints
return fmt.Errorf("computing topology, %w", err)
}
for _, pod := range topologyGroup.Pods {
domain := topologyGroup.NextDomain(constraints.Requirements.With(v1alpha5.PodRequirements(pod)).Requirement(topologyGroup.Constraint.TopologyKey))
domain := topologyGroup.NextDomain(constraints.Requirements.Add(v1alpha5.PodRequirements(pod)...).Requirement(topologyGroup.Constraint.TopologyKey))
pod.Spec.NodeSelector = functional.UnionStringMaps(pod.Spec.NodeSelector, map[string]string{topologyGroup.Constraint.TopologyKey: domain})
}
}
Expand Down
Loading

0 comments on commit cec6215

Please sign in to comment.