Skip to content

Commit

Permalink
Experimental support for volume topology aware scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
ellistarn committed Dec 17, 2021
1 parent b405f79 commit ffd50ce
Show file tree
Hide file tree
Showing 12 changed files with 355 additions and 67 deletions.
1 change: 1 addition & 0 deletions aws-ebs-csi-driver
Submodule aws-ebs-csi-driver added at e7b739
8 changes: 7 additions & 1 deletion charts/karpenter/templates/controller/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ rules:
- apiGroups: ["karpenter.sh"]
resources: ["provisioners", "provisioners/status"]
verbs: ["create", "delete", "patch", "get", "list", "watch"]
- apiGroups: [""]
resources: ["persistentvolumes", "persistentvolumeclaims"]
verbs: ["get", "list", "watch", "update"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["get", "list", "watch"]
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["create", "get", "patch", "update", "watch"]
Expand All @@ -67,4 +73,4 @@ rules:
- apiGroups: ["apps"]
resources: ["daemonsets"]
verbs: ["list", "watch"]
---
---
4 changes: 2 additions & 2 deletions pkg/apis/provisioning/v1alpha5/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *Constraints) ValidatePod(pod *v1.Pod) error {
}
}
// The combined requirements are not compatible
combined := c.Requirements.With(podRequirements)
combined := c.Requirements.Append(podRequirements...)
for _, key := range podRequirements.Keys() {
if combined.Requirement(key).Len() == 0 {
return fmt.Errorf("invalid nodeSelector %q, %v not in %v", key, podRequirements.Requirement(key).UnsortedList(), c.Requirements.Requirement(key).UnsortedList())
Expand All @@ -65,7 +65,7 @@ func (c *Constraints) ValidatePod(pod *v1.Pod) error {
func (c *Constraints) Tighten(pod *v1.Pod) *Constraints {
return &Constraints{
Labels: c.Labels,
Requirements: c.Requirements.With(PodRequirements(pod)).Consolidate().WellKnown(),
Requirements: c.Requirements.Append(PodRequirements(pod)...).Consolidate().WellKnown(),
Taints: c.Taints,
Provider: c.Provider,
}
Expand Down
25 changes: 0 additions & 25 deletions pkg/apis/provisioning/v1alpha5/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ package v1alpha5
import (
"context"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/apis"
)

Expand All @@ -40,29 +38,6 @@ var (
)

var (
// RestrictedLabels are injected by Cloud Providers
RestrictedLabels = sets.NewString(
// Used internally by provisioning logic
EmptinessTimestampAnnotationKey,
v1.LabelHostname,
)
// These are either prohibited by the kubelet or reserved by karpenter
KarpenterLabelDomain = "karpenter.sh"
RestrictedLabelDomains = sets.NewString(
"kubernetes.io",
"k8s.io",
KarpenterLabelDomain,
)
LabelCapacityType = KarpenterLabelDomain + "/capacity-type"
// WellKnownLabels supported by karpenter
WellKnownLabels = sets.NewString(
v1.LabelTopologyZone,
v1.LabelInstanceTypeStable,
v1.LabelArchStable,
v1.LabelOSStable,
LabelCapacityType,
v1.LabelHostname, // Used internally for hostname topology spread
)
DefaultHook = func(ctx context.Context, constraints *Constraints) {}
ValidateHook = func(ctx context.Context, constraints *Constraints) *apis.FieldError { return nil }
)
Expand Down
98 changes: 74 additions & 24 deletions pkg/apis/provisioning/v1alpha5/requirements.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,42 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
)

var (
// RestrictedLabels are injected by Cloud Providers
RestrictedLabels = sets.NewString(
// Used internally by provisioning logic
EmptinessTimestampAnnotationKey,
v1.LabelHostname,
)
// These are either prohibited by the kubelet or reserved by karpenter
KarpenterLabelDomain = "karpenter.sh"
RestrictedLabelDomains = sets.NewString(
"kubernetes.io",
"k8s.io",
KarpenterLabelDomain,
)
LabelCapacityType = KarpenterLabelDomain + "/capacity-type"
// WellKnownLabels supported by the controller
WellKnownLabels = sets.NewString(
v1.LabelTopologyZone,
v1.LabelInstanceTypeStable,
v1.LabelArchStable,
v1.LabelOSStable,
LabelCapacityType,
v1.LabelHostname, // Used internally for hostname topology spread
)
// NormalizedLabels translate aliased concepts into the controller's
// WellKnownLabels. Pod requirements are translated for compatibility,
// however, Provisioner labels are still restricted to WellKnownLabels.
// Additional labels may be injected by cloud providers.
NormalizedLabels = map[string]string{
v1.LabelFailureDomainBetaZone: v1.LabelTopologyZone,
"beta.kubernetes.io/arch": v1.LabelArchStable,
"beta.kubernetes.io/os": v1.LabelOSStable,
v1.LabelInstanceType: v1.LabelInstanceTypeStable,
}
)

// Requirements is a decorated alias type for []v1.NodeSelectorRequirements
type Requirements []v1.NodeSelectorRequirement

Expand All @@ -44,58 +80,72 @@ func (r Requirements) CapacityTypes() sets.String {
return r.Requirement(LabelCapacityType)
}

func (r Requirements) With(requirements Requirements) Requirements {
return append(r, requirements...)
func (r Requirements) Append(requirements ...v1.NodeSelectorRequirement) Requirements {
return append(r, Requirements(requirements).Normalize()...)
}

// Normalize the requirements to use WellKnownLabels
func (r Requirements) Normalize() Requirements {
normalized := Requirements{}
for _, requirement := range r {
label := requirement.Key
if normalized, ok := NormalizedLabels[requirement.Key]; ok {
label = normalized
}
normalized = append(normalized, v1.NodeSelectorRequirement{Key: label, Operator: requirement.Operator, Values: requirement.Values})
}
return normalized
}

// Consolidate combines In and NotIn requirements for each unique key, producing
// an equivalent minimal representation of the requirements. This is useful as
// requirements may be appended from a variety of sources and then consolidated.
// Caution: If a key has contains a `NotIn` operator without a corresponding
// `In` operator, the requirement will permanently be [] after consolidation. To
// avoid this, include the broadest `In` requirements before consolidating.
func (r Requirements) Consolidate() (requirements Requirements) {
for _, key := range r.Keys() {
requirements = requirements.Append(v1.NodeSelectorRequirement{
Key: key,
Operator: v1.NodeSelectorOpIn,
Values: r.Requirement(key).UnsortedList(),
})
}
return requirements
}

func LabelRequirements(labels map[string]string) (r Requirements) {
for key, value := range labels {
r = append(r, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}})
r = r.Append(v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}})
}
return r
}

func PodRequirements(pod *v1.Pod) (r Requirements) {
for key, value := range pod.Spec.NodeSelector {
r = append(r, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}})
r = r.Append(v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}})
}
if pod.Spec.Affinity == nil || pod.Spec.Affinity.NodeAffinity == nil {
return r
}
// Select heaviest preference and treat as a requirement. An outer loop will iteratively unconstrain them if unsatisfiable.
if preferred := pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution; len(preferred) > 0 {
sort.Slice(preferred, func(i int, j int) bool { return preferred[i].Weight > preferred[j].Weight })
r = append(r, preferred[0].Preference.MatchExpressions...)
r = r.Append(preferred[0].Preference.MatchExpressions...)
}
// Select first requirement. An outer loop will iteratively remove OR requirements if unsatisfiable
if pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil &&
len(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) > 0 {
r = append(r, pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions...)
r = r.Append(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions...)
}
return r
}

// Consolidate combines In and NotIn requirements for each unique key, producing
// an equivalent minimal representation of the requirements. This is useful as
// requirements may be appended from a variety of sources and then consolidated.
// Caution: If a key has contains a `NotIn` operator without a corresponding
// `In` operator, the requirement will permanently be [] after consolidation. To
// avoid this, include the broadest `In` requirements before consolidating.
func (r Requirements) Consolidate() (requirements Requirements) {
for _, key := range r.Keys() {
requirements = append(requirements, v1.NodeSelectorRequirement{
Key: key,
Operator: v1.NodeSelectorOpIn,
Values: r.Requirement(key).UnsortedList(),
})
}
return requirements
}

func (r Requirements) WellKnown() (requirements Requirements) {
for _, requirement := range r {
if WellKnownLabels.Has(requirement.Key) {
requirements = append(requirements, requirement)
requirements = requirements.Append(requirement)

}
}
return requirements
Expand Down
5 changes: 5 additions & 0 deletions pkg/cloudprovider/aws/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider"
"github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1"
"github.com/aws/karpenter/pkg/utils/functional"
"github.com/aws/karpenter/pkg/utils/parallel"
"github.com/aws/karpenter/pkg/utils/project"

Expand Down Expand Up @@ -55,6 +56,10 @@ const (
CacheCleanupInterval = 10 * time.Minute
)

func init() {
v1alpha5.NormalizedLabels = functional.UnionStringMaps(v1alpha5.NormalizedLabels, map[string]string{"topology.ebs.csi.aws.com/zone": v1.LabelTopologyZone})
}

type CloudProvider struct {
instanceTypeProvider *InstanceTypeProvider
subnetProvider *SubnetProvider
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func (c *Controller) Apply(ctx context.Context, provisioner *v1alpha5.Provisione
}
provisioner.Spec.Labels = functional.UnionStringMaps(provisioner.Spec.Labels, map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name})
provisioner.Spec.Requirements = provisioner.Spec.Requirements.
With(requirements(instanceTypes)).
With(v1alpha5.LabelRequirements(provisioner.Spec.Labels)).
Append(requirements(instanceTypes)...).
Append(v1alpha5.LabelRequirements(provisioner.Spec.Labels)...).
Consolidate()
// Update the provisioner if anything has changed
if c.hasChanged(ctx, provisioner) {
Expand Down
33 changes: 27 additions & 6 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ func (p *Provisioner) launch(ctx context.Context, constraints *v1alpha5.Constrai
return <-p.cloudProvider.Create(ctx, constraints, packing.InstanceTypeOptions, packing.NodeQuantity, func(node *v1.Node) error {
node.Labels = functional.UnionStringMaps(node.Labels, constraints.Labels)
node.Spec.Taints = append(node.Spec.Taints, constraints.Taints...)
return p.bind(ctx, node, <-pods)
return p.create(ctx, node, <-pods)
})
}

func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err error) {
func (p *Provisioner) create(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err error) {
defer metrics.Measure(bindTimeHistogram.WithLabelValues(injection.GetNamespacedName(ctx).Name))()

// Add the Karpenter finalizer to the node to enable the termination workflow
Expand Down Expand Up @@ -237,10 +237,8 @@ func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (
// Bind pods
var bound int64
workqueue.ParallelizeUntil(ctx, len(pods), len(pods), func(i int) {
pod := pods[i]
binding := &v1.Binding{TypeMeta: pod.TypeMeta, ObjectMeta: pod.ObjectMeta, Target: v1.ObjectReference{Name: node.Name}}
if err := p.coreV1Client.Pods(pods[i].Namespace).Bind(ctx, binding, metav1.CreateOptions{}); err != nil {
logging.FromContext(ctx).Errorf("Failed to bind %s/%s to %s, %s", pod.Namespace, pod.Name, node.Name, err.Error())
if err := p.bind(ctx, node, pods[i]); err != nil {
logging.FromContext(ctx).Errorf("Failed to bind %s/%s to %s, %s", pods[i].Namespace, pods[i].Name, node.Name, err.Error())
} else {
atomic.AddInt64(&bound, 1)
}
Expand All @@ -249,6 +247,29 @@ func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (
return nil
}

func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pod *v1.Pod) error {
if err := p.coreV1Client.Pods(pod.Namespace).Bind(ctx, &v1.Binding{TypeMeta: pod.TypeMeta, ObjectMeta: pod.ObjectMeta, Target: v1.ObjectReference{Name: node.Name}}, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("binding pod %s/%s, %w", pod.Name, pod.Namespace, err)
}
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}
pvc := &v1.PersistentVolumeClaim{}
if err := p.kubeClient.Get(ctx, types.NamespacedName{Name: volume.PersistentVolumeClaim.ClaimName, Namespace: pod.Namespace}, pvc); err != nil {
return fmt.Errorf("getting persistent volume claim %s, %w", volume.PersistentVolumeClaim.ClaimName, err)
}
// if _, ok := pvc.Annotations["volume.kubernetes.io/selected-node"]; ok {
// continue
// }
pvc.Annotations["volume.kubernetes.io/selected-node"] = node.Name
if err := p.kubeClient.Update(ctx, pvc); err != nil {
return fmt.Errorf("binding persistent volume claim %s/%s to node %q, %w", pvc.Namespace, pvc.Name, node.Name, err)
}
}
return nil
}

var bindTimeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/scheduling/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (t *Topology) Inject(ctx context.Context, constraints *v1alpha5.Constraints
return fmt.Errorf("computing topology, %w", err)
}
for _, pod := range topologyGroup.Pods {
domain := topologyGroup.NextDomain(constraints.Requirements.With(v1alpha5.PodRequirements(pod)).Requirement(topologyGroup.Constraint.TopologyKey))
domain := topologyGroup.NextDomain(constraints.Requirements.Append(v1alpha5.PodRequirements(pod)...).Requirement(topologyGroup.Constraint.TopologyKey))
pod.Spec.NodeSelector = functional.UnionStringMaps(pod.Spec.NodeSelector, map[string]string{topologyGroup.Constraint.TopologyKey: domain})
}
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/controllers/selection/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,19 @@ import (

// Controller for the resource
type Controller struct {
kubeClient client.Client
provisioners *provisioning.Controller
preferences *Preferences
kubeClient client.Client
provisioners *provisioning.Controller
preferences *Preferences
volumeTopology *VolumeTopology
}

// NewController constructs a controller instance
func NewController(kubeClient client.Client, provisioners *provisioning.Controller) *Controller {
return &Controller{
kubeClient: kubeClient,
provisioners: provisioners,
preferences: NewPreferences(),
kubeClient: kubeClient,
provisioners: provisioners,
preferences: NewPreferences(),
volumeTopology: NewVolumeTopology(kubeClient),
}
}

Expand Down Expand Up @@ -80,6 +82,10 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (errs error) {
// Relax preferences if pod has previously failed to schedule.
c.preferences.Relax(ctx, pod)
// Inject volume topological requirements
if err := c.volumeTopology.Inject(ctx, pod); err != nil {
return fmt.Errorf("getting volume topology requirements, %w", err)
}
// Pick provisioner
var provisioner *provisioning.Provisioner
provisioners := c.provisioners.List(ctx)
Expand Down
Loading

0 comments on commit ffd50ce

Please sign in to comment.