Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tzneal committed Mar 28, 2022
1 parent 3bd976e commit 0a44b01
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 61 deletions.
2 changes: 1 addition & 1 deletion pkg/apis/provisioning/v1alpha5/requirements.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (r *Requirements) UnmarshalJSON(b []byte) error {
return nil
}

func (r *Requirements) String() string {
func (r Requirements) String() string {
var sb strings.Builder
for key, req := range r.requirements {
var values []string
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (p *Provisioner) launch(ctx context.Context, node *scheduling.Node) error {
if err != nil {
return fmt.Errorf("creating cloud provider machine, %w", err)
}
logging.FromContext(ctx).Infof("Launched %s", node)

if err := mergo.Merge(k8sNode, nodeRequest.Constraints.ToNode()); err != nil {
return fmt.Errorf("merging cloud provider node, %w", err)
}
Expand All @@ -163,6 +163,7 @@ func (p *Provisioner) launch(ctx context.Context, node *scheduling.Node) error {
return fmt.Errorf("creating node %s, %w", k8sNode.Name, err)
}
}
logging.FromContext(ctx).Infof("Created %s", node)
if err := p.bind(ctx, k8sNode, node.Pods); err != nil {
return fmt.Errorf("binding pods, %w", err)
}
Expand Down
48 changes: 11 additions & 37 deletions pkg/controllers/provisioning/scheduling/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ func (n Node) Compatible(pod *v1.Pod) error {
for _, it := range n.InstanceTypeOptions {
newSize := resources.Merge(n.reservedResources(it), podRequests)
if cloudprovider.Compatible(it, tightened) &&
n.newPodCanFit(newSize, it) &&
n.hasCompatibleResources(podRequests, it) {
n.newPodCanFit(newSize, it) {
return nil
}
}
Expand All @@ -85,17 +84,12 @@ func (n Node) reservedResources(it cloudprovider.InstanceType) v1.ResourceList {
}

func (n *Node) newPodCanFit(newSize v1.ResourceList, it cloudprovider.InstanceType) bool {
for resourceName, totalQuantity := range it.Resources() {
reservedQuantity := newSize[resourceName]
if reservedQuantity.Cmp(totalQuantity) > 0 {
instanceResources := it.Resources()
for resourceName, totalQuantity := range newSize {
if resources.Cmp(totalQuantity, instanceResources[resourceName]) > 0 {
return false
}
}

instancePodMax := it.Resources()[v1.ResourcePods]
if !instancePodMax.IsZero() && instancePodMax.CmpInt64(int64(len(n.Pods)+1)) < 0 {
return false
}
return true
}

Expand All @@ -109,48 +103,25 @@ func (n *Node) Add(pod *v1.Pod) error {
for _, it := range n.InstanceTypeOptions {
newSize := resources.Merge(n.reservedResources(it), podRequests)
if cloudprovider.Compatible(it, n.Requirements) &&
n.newPodCanFit(newSize, it) &&
n.hasCompatibleResources(resources.RequestsForPods(pod), it) {
n.newPodCanFit(newSize, it) {
instanceTypeOptions = append(instanceTypeOptions, it)
}
}
// have to add the pod after filtering instance types as newPodCanFit() checks if a new pod can fit, including the
// pod count
n.Pods = append(n.Pods, pod)
n.InstanceTypeOptions = instanceTypeOptions
n.podResources = resources.Merge(n.podResources, resources.RequestsForPods(pod))
n.podResources = resources.Merge(n.podResources, podRequests)

if len(n.InstanceTypeOptions) == 0 {
return fmt.Errorf("no instance type satisfied resources %s and requirements %s",
resources.String(resources.RequestsForPods(pod)),
n.Requirements.String())
n.Requirements)
}
return nil
}

// hasCompatibleResources tests if a given node selector and resource request list is compatible with an instance type
func (n Node) hasCompatibleResources(resourceList v1.ResourceList, it cloudprovider.InstanceType) bool {
for name, quantity := range resourceList {
// we don't care if the pod is requesting zero quantity of some resource
if quantity.IsZero() {
continue
}
// instance must have a non-zero quantity
if resources.IsZero(it.Resources()[name]) {
return false
}
}
return true
}

func (n Node) String() string {
var requiredResources v1.ResourceList
if len(n.InstanceTypeOptions) == 0 {
requiredResources = resources.Merge(n.daemonResources, n.podResources)
} else {
requiredResources = resources.Merge(n.daemonResources, n.InstanceTypeOptions[0].Overhead(), n.podResources)
}

var itSb strings.Builder
for i, it := range n.InstanceTypeOptions {
// print the first 5 instance types only (indices 0-4)
Expand All @@ -163,5 +134,8 @@ func (n Node) String() string {
fmt.Fprint(&itSb, it.Name())
}

return fmt.Sprintf("with %d pods using resources %s from types %s", len(n.Pods), resources.String(requiredResources), itSb.String())
return fmt.Sprintf("with %d pods required %s pod resources and %s daemon resources from types %s", len(n.Pods),
resources.String(n.podResources),
resources.String(n.daemonResources),
itSb.String())
}
4 changes: 1 addition & 3 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"sort"
"time"

"knative.dev/pkg/logging"

Expand Down Expand Up @@ -65,7 +64,6 @@ func NewScheduler(kubeClient client.Client) *Scheduler {
func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha5.Provisioner, instanceTypes []cloudprovider.InstanceType, pods []*v1.Pod) ([]*Node, error) {
defer metrics.Measure(schedulingDuration.WithLabelValues(injection.GetNamespacedName(ctx).Name))()
constraints := provisioner.Spec.Constraints.DeepCopy()
start := time.Now()

sort.Slice(pods, byCPUAndMemoryDescending(pods))
sort.Slice(instanceTypes, byPrice(instanceTypes))
Expand Down Expand Up @@ -103,7 +101,7 @@ func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha5.Provisioner
}
}
}
logging.FromContext(ctx).Infof("Scheduled %d pods onto %d nodes in %s", len(pods), len(nodeSet.nodes), time.Since(start))
logging.FromContext(ctx).Infof("Scheduled %d pods onto %d nodes", len(pods), len(nodeSet.nodes))
return nodeSet.nodes, nil
}

Expand Down
31 changes: 12 additions & 19 deletions pkg/utils/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,43 @@ limitations under the License.
package resources

import (
"fmt"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/aws/karpenter/pkg/utils/pretty"
)

// RequestsForPods returns the total resources of a variadic list of podspecs.
func RequestsForPods(pods ...*v1.Pod) v1.ResourceList {
resources := []v1.ResourceList{}
var resources []v1.ResourceList
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
resources = append(resources, container.Resources.Requests)
}
}
return Merge(resources...)
merged := Merge(resources...)
merged[v1.ResourcePods] = *resource.NewQuantity(int64(len(pods)), resource.DecimalExponent)
return merged
}

// LimitsForPods returns the total resources of a variadic list of podspecs
func LimitsForPods(pods ...*v1.Pod) v1.ResourceList {
resources := []v1.ResourceList{}
var resources []v1.ResourceList
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
resources = append(resources, container.Resources.Limits)
}
}
return Merge(resources...)
merged := Merge(resources...)
merged[v1.ResourcePods] = *resource.NewQuantity(int64(len(pods)), resource.DecimalExponent)
return merged
}

// Merge the resources from the variadic into a single v1.ResourceList
func Merge(resources ...v1.ResourceList) v1.ResourceList {
if len(resources) == 0 {
return v1.ResourceList{}
}
// reserve some capacity to avoid some re-allocations
result := make(v1.ResourceList, len(resources[0]))
for _, resourceList := range resources {
for resourceName, quantity := range resourceList {
Expand Down Expand Up @@ -80,16 +82,7 @@ func Cmp(lhs resource.Quantity, rhs resource.Quantity) int {
// String returns a string version of the resource list suitable for presenting in a log
func String(list v1.ResourceList) string {
if len(list) == 0 {
return "{none}"
}
var sb strings.Builder
sb.WriteByte('{')
for k, v := range list {
if sb.Len() > 1 {
sb.WriteByte(' ')
}
fmt.Fprintf(&sb, "%s: %s", k, v.String())
return "{}"
}
sb.WriteByte('}')
return sb.String()
return pretty.Concise(list)
}

0 comments on commit 0a44b01

Please sign in to comment.