diff --git a/pkg/controllers/selection/antiaffinity.go b/pkg/controllers/selection/antiaffinity.go new file mode 100644 index 000000000000..474fc7f95549 --- /dev/null +++ b/pkg/controllers/selection/antiaffinity.go @@ -0,0 +1,81 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package selection + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "knative.dev/pkg/apis" +) + +func NewAntiAffinity() *AntiAffinity { + return &AntiAffinity{} +} + +var AllowedAntiAffinityKeys = sets.NewString(v1.LabelHostname) + +type AntiAffinity struct{} + +// Validate that the affinity terms are supported +func (a *AntiAffinity) Validate(pod *v1.Pod) (errs *apis.FieldError) { + for i, term := range a.termsFor(pod) { + errs = errs.Also(a.validateTerm(pod, term).ViaIndex(i)) + } + return errs +} + +func (a *AntiAffinity) validateTerm(pod *v1.Pod, term v1.PodAffinityTerm) (errs *apis.FieldError) { + if term.NamespaceSelector != nil { + errs = errs.Also(apis.ErrDisallowedFields("namespaceSelector")) + } + for i, namespace := range term.Namespaces { + if namespace != pod.Namespace { + errs = errs.Also(apis.ErrInvalidArrayValue(fmt.Sprintf("%s, cross namespace affinity is not supported", namespace), "namespaces", i)) + } + } + if !AllowedAntiAffinityKeys.Has(term.TopologyKey) { + errs = errs.Also(apis.ErrInvalidKeyName(fmt.Sprintf("%s not in %v", term.TopologyKey, AllowedAntiAffinityKeys.UnsortedList()), "topologyKey")) + } + return errs +} + +// Transform pod anti affinity into topology rules +func (a *AntiAffinity) Transform(ctx context.Context, pod *v1.Pod) { + for _, term := range a.termsFor(pod) { + pod.Spec.TopologySpreadConstraints = append(pod.Spec.TopologySpreadConstraints, v1.TopologySpreadConstraint{ + MaxSkew: 1, + TopologyKey: term.TopologyKey, + WhenUnsatisfiable: v1.DoNotSchedule, + LabelSelector: term.LabelSelector, + }) + } +} + +func (a *AntiAffinity) termsFor(pod *v1.Pod) []v1.PodAffinityTerm { + if pod.Spec.Affinity == nil { + return nil + } + if pod.Spec.Affinity.PodAntiAffinity == nil { + return nil + } + terms := pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution + for _, term := range pod.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution { + terms = append(terms, term.PodAffinityTerm) + } + return terms +} diff --git a/pkg/controllers/selection/controller.go b/pkg/controllers/selection/controller.go index a9de508cd0f3..40e07e6bb313 100644 --- a/pkg/controllers/selection/controller.go +++ b/pkg/controllers/selection/controller.go @@ -43,6 +43,7 @@ type Controller struct { provisioners *provisioning.Controller preferences *Preferences volumeTopology *VolumeTopology + antiAffinity *AntiAffinity } // NewController constructs a controller instance @@ -52,6 +53,7 @@ func NewController(kubeClient client.Client, provisioners *provisioning.Controll provisioners: provisioners, preferences: NewPreferences(), volumeTopology: NewVolumeTopology(kubeClient), + antiAffinity: NewAntiAffinity(), } } @@ -69,7 +71,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco if !isProvisionable(pod) { return reconcile.Result{}, nil } - if err := validate(pod); err != nil { + if err := c.validate(pod); err != nil { logging.FromContext(ctx).Debugf("Ignoring pod, %s", err.Error()) return reconcile.Result{}, nil } @@ -84,6 +86,8 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (errs error) { // Relax preferences if pod has previously failed to schedule. c.preferences.Relax(ctx, pod) + // Translate anti affinity into topology + c.antiAffinity.Transform(ctx, pod) // Inject volume topological requirements if err := c.volumeTopology.Inject(ctx, pod); err != nil { return fmt.Errorf("getting volume topology requirements, %w", err) @@ -120,14 +124,14 @@ func isProvisionable(p *v1.Pod) bool { !pod.IsOwnedByNode(p) } -func validate(p *v1.Pod) error { +func (c *Controller) validate(p *v1.Pod) error { return multierr.Combine( - validateAffinity(p), - validateTopology(p), + c.validateAffinity(p), + c.validateTopology(p), ) } -func validateTopology(pod *v1.Pod) (errs error) { +func (c *Controller) validateTopology(pod *v1.Pod) (errs error) { for _, constraint := range pod.Spec.TopologySpreadConstraints { if supported := sets.NewString(v1.LabelHostname, v1.LabelTopologyZone); !supported.Has(constraint.TopologyKey) { errs = multierr.Append(errs, fmt.Errorf("unsupported topology key, %s not in %s", constraint.TopologyKey, supported)) @@ -136,16 +140,16 @@ func validateTopology(pod *v1.Pod) (errs error) { return errs } -func validateAffinity(pod *v1.Pod) (errs error) { +func (c *Controller) validateAffinity(pod *v1.Pod) (errs error) { if pod.Spec.Affinity == nil { return nil } + if err := c.antiAffinity.Validate(pod).ViaField("affinity.podAntiAffinity"); err != nil { + errs = multierr.Append(errs, fmt.Errorf(err.Error())) + } if pod.Spec.Affinity.PodAffinity != nil { errs = multierr.Append(errs, fmt.Errorf("pod affinity is not supported")) } - if pod.Spec.Affinity.PodAntiAffinity != nil { - errs = multierr.Append(errs, fmt.Errorf("pod anti-affinity is not supported")) - } if pod.Spec.Affinity.NodeAffinity != nil { for _, term := range pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution { errs = multierr.Append(errs, validateNodeSelectorTerm(term.Preference))