Skip to content

Commit

Permalink
perf: Cache requirements for pods alongside requests (#1950)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis authored Feb 5, 2025
1 parent d9d8c73 commit e86d1ef
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 38 deletions.
18 changes: 5 additions & 13 deletions pkg/controllers/provisioning/scheduling/existingnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewExistingNode(n *state.StateNode, topology *Topology, taints []v1.Taint,
return node
}

func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v1.Pod, podRequests v1.ResourceList) error {
func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v1.Pod, podData *PodData) error {
// Check Taints
if err := scheduling.Taints(n.cachedTaints).Tolerates(pod); err != nil {
return err
Expand All @@ -86,29 +86,21 @@ func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v

// check resource requests first since that's a pretty likely reason the pod won't schedule on an in-flight
// node, which at this point can't be increased in size
requests := resources.Merge(n.requests, podRequests)
requests := resources.Merge(n.requests, podData.Requests)

if !resources.Fits(requests, n.cachedAvailable) {
return fmt.Errorf("exceeds node resources")
}

nodeRequirements := scheduling.NewRequirements(n.requirements.Values()...)
podRequirements := scheduling.NewPodRequirements(pod)
// Check NodeClaim Affinity Requirements
if err = nodeRequirements.Compatible(podRequirements); err != nil {
if err = nodeRequirements.Compatible(podData.Requirements); err != nil {
return err
}
nodeRequirements.Add(podRequirements.Values()...)

strictPodRequirements := podRequirements
if scheduling.HasPreferredNodeAffinity(pod) {
// strictPodRequirements is important as it ensures we don't inadvertently restrict the possible pod domains by a
// preferred node affinity. Only required node affinities can actually reduce pod domains.
strictPodRequirements = scheduling.NewStrictPodRequirements(pod)
}
nodeRequirements.Add(podData.Requirements.Values()...)

// Check Topology Requirements
topologyRequirements, err := n.topology.AddRequirements(strictPodRequirements, nodeRequirements, pod)
topologyRequirements, err := n.topology.AddRequirements(podData.StrictRequirements, nodeRequirements, pod)
if err != nil {
return err
}
Expand Down
19 changes: 6 additions & 13 deletions pkg/controllers/provisioning/scheduling/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewNodeClaim(nodeClaimTemplate *NodeClaimTemplate, topology *Topology, daem
}
}

func (n *NodeClaim) Add(pod *v1.Pod, podRequests v1.ResourceList) error {
func (n *NodeClaim) Add(pod *v1.Pod, podData *PodData) error {
// Check Taints
if err := scheduling.Taints(n.Spec.Taints).Tolerates(pod); err != nil {
return err
Expand All @@ -76,22 +76,15 @@ func (n *NodeClaim) Add(pod *v1.Pod, podRequests v1.ResourceList) error {
return fmt.Errorf("checking host port usage, %w", err)
}
nodeClaimRequirements := scheduling.NewRequirements(n.Requirements.Values()...)
podRequirements := scheduling.NewPodRequirements(pod)

// Check NodeClaim Affinity Requirements
if err := nodeClaimRequirements.Compatible(podRequirements, scheduling.AllowUndefinedWellKnownLabels); err != nil {
if err := nodeClaimRequirements.Compatible(podData.Requirements, scheduling.AllowUndefinedWellKnownLabels); err != nil {
return fmt.Errorf("incompatible requirements, %w", err)
}
nodeClaimRequirements.Add(podRequirements.Values()...)
nodeClaimRequirements.Add(podData.Requirements.Values()...)

strictPodRequirements := podRequirements
if scheduling.HasPreferredNodeAffinity(pod) {
// strictPodRequirements is important as it ensures we don't inadvertently restrict the possible pod domains by a
// preferred node affinity. Only required node affinities can actually reduce pod domains.
strictPodRequirements = scheduling.NewStrictPodRequirements(pod)
}
// Check Topology Requirements
topologyRequirements, err := n.topology.AddRequirements(strictPodRequirements, nodeClaimRequirements, pod, scheduling.AllowUndefinedWellKnownLabels)
topologyRequirements, err := n.topology.AddRequirements(podData.StrictRequirements, nodeClaimRequirements, pod, scheduling.AllowUndefinedWellKnownLabels)
if err != nil {
return err
}
Expand All @@ -101,9 +94,9 @@ func (n *NodeClaim) Add(pod *v1.Pod, podRequests v1.ResourceList) error {
nodeClaimRequirements.Add(topologyRequirements.Values()...)

// Check instance type combinations
requests := resources.Merge(n.Spec.Resources.Requests, podRequests)
requests := resources.Merge(n.Spec.Resources.Requests, podData.Requests)

remaining, err := filterInstanceTypesByRequirements(n.InstanceTypeOptions, nodeClaimRequirements, podRequests, n.daemonResources, requests)
remaining, err := filterInstanceTypesByRequirements(n.InstanceTypeOptions, nodeClaimRequirements, podData.Requests, n.daemonResources, requests)
if err != nil {
// We avoid wrapping this err because calling String() on InstanceTypeFilterError is an expensive operation
// due to calls to resources.Merge and stringifying the nodeClaimRequirements
Expand Down
10 changes: 5 additions & 5 deletions pkg/controllers/provisioning/scheduling/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type Queue struct {
}

// NewQueue constructs a new queue given the input pods, sorting them to optimize for bin-packing into nodes.
func NewQueue(pods []*v1.Pod, podRequests map[types.UID]v1.ResourceList) *Queue {
sort.Slice(pods, byCPUAndMemoryDescending(pods, podRequests))
func NewQueue(pods []*v1.Pod, podData map[types.UID]*PodData) *Queue {
sort.Slice(pods, byCPUAndMemoryDescending(pods, podData))
return &Queue{
pods: pods,
lastLen: map[types.UID]int{},
Expand Down Expand Up @@ -73,13 +73,13 @@ func (q *Queue) List() []*v1.Pod {
return q.pods
}

func byCPUAndMemoryDescending(pods []*v1.Pod, podRequests map[types.UID]v1.ResourceList) func(i int, j int) bool {
func byCPUAndMemoryDescending(pods []*v1.Pod, podData map[types.UID]*PodData) func(i int, j int) bool {
return func(i, j int) bool {
lhsPod := pods[i]
rhsPod := pods[j]

lhs := podRequests[lhsPod.UID]
rhs := podRequests[rhsPod.UID]
lhs := podData[lhsPod.UID].Requests
rhs := podData[rhsPod.UID].Requests

cpuCmp := resources.Cmp(lhs[v1.ResourceCPU], rhs[v1.ResourceCPU])
if cpuCmp < 0 {
Expand Down
37 changes: 30 additions & 7 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodePools []*v1
topology: topology,
cluster: cluster,
daemonOverhead: getDaemonOverhead(templates, daemonSetPods),
cachedPodRequests: map[types.UID]corev1.ResourceList{}, // cache pod requests to avoid having to continually recompute this total
cachedPodData: map[types.UID]*PodData{}, // cache pod data to avoid having to continually recompute it
recorder: recorder,
preferences: &Preferences{ToleratePreferNoSchedule: toleratePreferNoSchedule},
remainingResources: lo.SliceToMap(nodePools, func(np *v1.NodePool) (string, corev1.ResourceList) {
Expand All @@ -89,14 +89,20 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodePools []*v1
return s
}

type PodData struct {
Requests corev1.ResourceList
Requirements scheduling.Requirements
StrictRequirements scheduling.Requirements
}

type Scheduler struct {
id types.UID // Unique UUID attached to this scheduling loop
newNodeClaims []*NodeClaim
existingNodes []*ExistingNode
nodeClaimTemplates []*NodeClaimTemplate
remainingResources map[string]corev1.ResourceList // (NodePool name) -> remaining resources for that NodePool
daemonOverhead map[*NodeClaimTemplate]corev1.ResourceList
cachedPodRequests map[types.UID]corev1.ResourceList // (Pod Namespace/Name) -> calculated resource requests for the pod
cachedPodData map[types.UID]*PodData // (Pod Namespace/Name) -> pre-computed data for pods to avoid re-computation and memory usage
preferences *Preferences
topology *Topology
cluster *state.Cluster
Expand Down Expand Up @@ -217,9 +223,9 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results {
UnschedulablePodsCount.DeletePartialMatch(map[string]string{ControllerLabel: injection.GetControllerName(ctx)})
QueueDepth.DeletePartialMatch(map[string]string{ControllerLabel: injection.GetControllerName(ctx)})
for _, p := range pods {
s.cachedPodRequests[p.UID] = resources.RequestsForPods(p)
s.updateCachedPodData(p)
}
q := NewQueue(pods, s.cachedPodRequests)
q := NewQueue(pods, s.cachedPodData)

startTime := s.clock.Now()
lastLogTime := s.clock.Now()
Expand Down Expand Up @@ -251,6 +257,8 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results {
if err := s.topology.Update(ctx, pod); err != nil {
log.FromContext(ctx).Error(err, "failed updating topology")
}
// Update the cached podData since the pod was relaxed and it could have changed its requirement set
s.updateCachedPodData(pod)
}
}
UnfinishedWorkSeconds.Delete(map[string]string{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.id)})
Expand All @@ -265,10 +273,25 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results {
}
}

func (s *Scheduler) updateCachedPodData(p *corev1.Pod) {
requirements := scheduling.NewPodRequirements(p)
strictRequirements := requirements
if scheduling.HasPreferredNodeAffinity(p) {
// strictPodRequirements is important as it ensures we don't inadvertently restrict the possible pod domains by a
// preferred node affinity. Only required node affinities can actually reduce pod domains.
strictRequirements = scheduling.NewStrictPodRequirements(p)
}
s.cachedPodData[p.UID] = &PodData{
Requests: resources.RequestsForPods(p),
Requirements: requirements,
StrictRequirements: strictRequirements,
}
}

func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {
// first try to schedule against an in-flight real node
for _, node := range s.existingNodes {
if err := node.Add(ctx, s.kubeClient, pod, s.cachedPodRequests[pod.UID]); err == nil {
if err := node.Add(ctx, s.kubeClient, pod, s.cachedPodData[pod.UID]); err == nil {
return nil
}
}
Expand All @@ -278,7 +301,7 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {

// Pick existing node that we are about to create
for _, nodeClaim := range s.newNodeClaims {
if err := nodeClaim.Add(pod, s.cachedPodRequests[pod.UID]); err == nil {
if err := nodeClaim.Add(pod, s.cachedPodData[pod.UID]); err == nil {
return nil
}
}
Expand All @@ -299,7 +322,7 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {
}
}
nodeClaim := NewNodeClaim(nodeClaimTemplate, s.topology, s.daemonOverhead[nodeClaimTemplate], instanceTypes)
if err := nodeClaim.Add(pod, s.cachedPodRequests[pod.UID]); err != nil {
if err := nodeClaim.Add(pod, s.cachedPodData[pod.UID]); err != nil {
nodeClaim.Destroy() // Ensure we cleanup any changes that we made while mocking out a NodeClaim
errs = multierr.Append(errs, fmt.Errorf("incompatible with nodepool %q, daemonset overhead=%s, %w",
nodeClaimTemplate.NodePoolName,
Expand Down

0 comments on commit e86d1ef

Please sign in to comment.