Skip to content

Commit

Permalink
perf: increase scheduler performance (kubernetes-sigs#982)
Browse files Browse the repository at this point in the history
  • Loading branch information
tzneal authored Feb 14, 2024
1 parent 01c5b79 commit 686b75d
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,22 @@ const PrintStats = false
//nolint:gosec
var r = rand.New(rand.NewSource(42))

// To run the benchmarks use:
// `go test -tags=test_performance -run=XXX -bench=.`
//
// to get something statistically significant for comparison we need to run them several times and then
// compare the results between the old performance and the new performance.
// ```sh
//
// go test -tags=test_performance -run=XXX -bench=. -count=10 | tee /tmp/old
// # make your changes to the code
// go test -tags=test_performance -run=XXX -bench=. -count=10 | tee /tmp/new
// benchstat /tmp/old /tmp/new
//
// ```
func BenchmarkScheduling1(b *testing.B) {
benchmarkScheduler(b, 400, 1)
}

func BenchmarkScheduling50(b *testing.B) {
benchmarkScheduler(b, 400, 50)
}
Expand Down Expand Up @@ -102,7 +114,7 @@ func TestSchedulingProfile(t *testing.T) {
totalNodes := 0
var totalTime time.Duration
for _, instanceCount := range []int{400} {
for _, podCount := range []int{10, 100, 500, 1000, 1500, 2000, 2500} {
for _, podCount := range []int{10, 100, 500, 1000, 1500, 2000, 5000} {
start := time.Now()
res := testing.Benchmark(func(b *testing.B) { benchmarkScheduler(b, instanceCount, podCount) })
totalTime += time.Since(start) / time.Duration(res.N)
Expand Down
23 changes: 20 additions & 3 deletions pkg/controllers/provisioning/scheduling/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import (
"fmt"
"math"

"k8s.io/apimachinery/pkg/api/errors"

"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/scheduling"
"sigs.k8s.io/karpenter/pkg/utils/functional"
"sigs.k8s.io/karpenter/pkg/utils/pretty"

"k8s.io/apimachinery/pkg/api/errors"

"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -85,6 +85,19 @@ func NewTopology(ctx context.Context, kubeClient client.Client, cluster *state.C
return t, nil
}

// topologyError allows lazily generating the error string in the topology error. If a pod fails to schedule, most often
// we are only interested in the fact that it failed to schedule and not why.
type topologyError struct {
topology *TopologyGroup
podDomains *scheduling.Requirement
nodeDomains *scheduling.Requirement
}

func (t topologyError) Error() string {
return fmt.Sprintf("unsatisfiable topology constraint for %s, key=%s (counts = %s, podDomains = %v, nodeDomains = %v", t.topology.Type, t.topology.Key,
pretty.Map(t.topology.domains, 25), t.podDomains, t.nodeDomains)
}

// Update unregisters the pod as the owner of all affinities and then creates any new topologies based on the pod spec
// registered the pod as the owner of all associated affinities, new or old. This allows Update() to be called after
// relaxation of a preference to properly break the topology <-> owner relationship so that the preferred topology will
Expand Down Expand Up @@ -165,7 +178,11 @@ func (t *Topology) AddRequirements(podRequirements, nodeRequirements scheduling.
}
domains := topology.Get(p, podDomains, nodeDomains)
if domains.Len() == 0 {
return nil, fmt.Errorf("unsatisfiable topology constraint for %s, key=%s (counts = %s, podDomains = %v, nodeDomains = %v", topology.Type, topology.Key, pretty.Map(topology.domains, 5), podDomains, nodeDomains)
return nil, topologyError{
topology: topology,
podDomains: podDomains,
nodeDomains: nodeDomains,
}
}
requirements.Add(domains)
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/controllers/provisioning/scheduling/topologygroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,7 @@ func (t *TopologyGroup) nextDomainAntiAffinity(domains *scheduling.Requirement)
// list of domains. The use case where this optimization is really great is when we are launching nodes for
// a deployment of pods with self anti-affinity. The domains map here continues to grow, and we continue to
// fully scan it each iteration.
if len(t.emptyDomains) == 0 {
return options
}
for domain := range t.domains {
for domain := range t.emptyDomains {
if domains.Has(domain) && t.domains[domain] == 0 {
options.Insert(domain)
}
Expand Down
37 changes: 35 additions & 2 deletions pkg/scheduling/requirements.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,38 @@ func labelHint(r Requirements, key string, allowedUndefined sets.Set[string]) st
return ""
}

// badKeyError allows lazily generating the error string in the case of a bad key error. When requirements fail
// to match, we are most often interested in the failure and not why it fails.
type badKeyError struct {
key string
incoming *Requirement
existing *Requirement
}

func (b badKeyError) Error() string {
return fmt.Sprintf("key %s, %s not in %s", b.key, b.incoming, b.existing)
}

// intersectKeys is much faster and allocates less han getting the two key sets separately and intersecting them
func (r Requirements) intersectKeys(rhs Requirements) sets.Set[string] {
smallest := r
largest := rhs
if len(smallest) > len(largest) {
smallest, largest = largest, smallest
}
keys := sets.Set[string]{}

for key := range smallest {
if _, ok := largest[key]; ok {
keys.Insert(key)
}
}
return keys
}

// Intersects returns errors if the requirements don't have overlapping values, undefined keys are allowed
func (r Requirements) Intersects(requirements Requirements) (errs error) {
for key := range r.Keys().Intersection(requirements.Keys()) {
for key := range r.intersectKeys(requirements) {
existing := r.Get(key)
incoming := requirements.Get(key)
// There must be some value, except
Expand All @@ -251,7 +280,11 @@ func (r Requirements) Intersects(requirements Requirements) (errs error) {
continue
}
}
errs = multierr.Append(errs, fmt.Errorf("key %s, %s not in %s", key, incoming, existing))
errs = multierr.Append(errs, badKeyError{
key: key,
incoming: incoming,
existing: existing,
})
}
}
return errs
Expand Down
32 changes: 32 additions & 0 deletions pkg/scheduling/requirements_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ limitations under the License.
package scheduling

import (
"os"
"runtime/pprof"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
)

Expand Down Expand Up @@ -654,3 +657,32 @@ func FuzzEditDistance(f *testing.F) {
editDistance(lhs, rhs)
})
}

// TestSchedulingProfile is used to gather profiling metrics, benchmarking is primarily done with standard
// Go benchmark functions
// go test -tags=test_performance -run=RequirementsProfile
func TestRequirementsProfile(t *testing.T) {
cpuf, err := os.Create("requirements.cpuprofile")
if err != nil {
t.Fatalf("error creating CPU profile: %s", err)
}
lo.Must0(pprof.StartCPUProfile(cpuf))
defer pprof.StopCPUProfile()

heapf, err := os.Create("requirements.heapprofile")
if err != nil {
t.Fatalf("error creating heap profile: %s", err)
}
defer lo.Must0(pprof.WriteHeapProfile(heapf))

reqsA := NewRequirements(NewRequirement("foo", v1.NodeSelectorOpIn, "a", "b", "c"))
reqsB := NewRequirements(NewRequirement("foo", v1.NodeSelectorOpIn, "d", "e", "f"))

for i := 0; i < 525000; i++ {
_ = reqsA.Intersects(reqsB)
_ = reqsA.Compatible(reqsB)
_ = reqsA.NodeSelectorRequirements()
_ = reqsA.Keys()
_ = reqsA.Values()
}
}
2 changes: 1 addition & 1 deletion pkg/utils/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func Cmp(lhs resource.Quantity, rhs resource.Quantity) int {
func Fits(candidate, total v1.ResourceList) bool {
// If any of the total resource values are negative then the resource will never fit
for _, quantity := range total {
if Cmp(resource.MustParse("0"), quantity) > 0 {
if Cmp(*resource.NewScaledQuantity(0, resource.Kilo), quantity) > 0 {
return false
}
}
Expand Down

0 comments on commit 686b75d

Please sign in to comment.