Skip to content

Commit

Permalink
BookCapacity for ProvisioningRequest pods
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Jun 12, 2024
1 parent 83db225 commit 7ac217c
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 54 deletions.
5 changes: 4 additions & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,10 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr

// finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable)
unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime)

err = a.processors.CapacityReservation.BookCapacity(a.AutoscalingContext)
if err != nil {
klog.Warningf("Failed to reserve capacity: %v", err)
}
preScaleUp := func() time.Time {
scaleUpStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart)
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/core/test/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster"
"k8s.io/autoscaler/cluster-autoscaler/processors/binpacking"
processor_callbacks "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks"
"k8s.io/autoscaler/cluster-autoscaler/processors/capacityreservation"
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
Expand Down Expand Up @@ -195,6 +196,7 @@ func NewTestProcessors(context *context.AutoscalingContext) *processors.Autoscal
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(),
ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(),
CapacityReservation: capacityreservation.DefaultCapacityReservationProcessor(),
}
}

Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,11 +508,12 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator)

opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor := provreq.NewProvReqProcessor(client)
provreqProcesor := provreq.NewProvReqProcessor(client, opts.PredicateChecker)
if err != nil {
return nil, err
}
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
opts.Processors.CapacityReservation = provreqProcesor
injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package capacityreservation

import (
"k8s.io/autoscaler/cluster-autoscaler/context"
)

// CapacityReservation is interface to reserve capacity in the cluster.
type CapacityReservation interface {
BookCapacity(ctx *context.AutoscalingContext) error
}

type NoOpCapacityReservation struct{}

func DefaultCapacityReservationProcessor() CapacityReservation {
return &NoOpCapacityReservation{}
}

func (c *NoOpCapacityReservation) BookCapacity(ctx *context.AutoscalingContext) error {
return nil
}
5 changes: 4 additions & 1 deletion cluster-autoscaler/processors/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster"
"k8s.io/autoscaler/cluster-autoscaler/processors/binpacking"
"k8s.io/autoscaler/cluster-autoscaler/processors/capacityreservation"
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
Expand Down Expand Up @@ -70,7 +71,8 @@ type AutoscalingProcessors struct {
// * scale-downs per nodegroup
// * scale-up failures per nodegroup
// * scale-down failures per nodegroup
ScaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList
ScaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList
CapacityReservation capacityreservation.CapacityReservation
}

// DefaultProcessors returns default set of processors.
Expand Down Expand Up @@ -101,6 +103,7 @@ func DefaultProcessors(options config.AutoscalingOptions) *AutoscalingProcessors
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false),
ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(),
ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(),
CapacityReservation: capacityreservation.DefaultCapacityReservationProcessor(),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,24 @@ limitations under the License.
package provreq

import (
"fmt"
"time"

apiv1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
)

const (
Expand All @@ -36,15 +44,20 @@ const (
defaultMaxUpdated = 20
)

type injector interface {
TrySchedulePods(clusterSnapshot clustersnapshot.ClusterSnapshot, pods []*apiv1.Pod, isNodeAcceptable func(*framework.NodeInfo) bool, breakOnFailure bool) ([]scheduling.Status, int, error)
}

type provReqProcessor struct {
now func() time.Time
maxUpdated int
client *provreqclient.ProvisioningRequestClient
injector injector
}

// NewProvReqProcessor return ProvisioningRequestProcessor.
func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient) *provReqProcessor {
return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client}
func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient, predicateChecker predicatechecker.PredicateChecker) *provReqProcessor {
return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client, injector: scheduling.NewHintingSimulator(predicateChecker)}
}

// Refresh implements loop.Observer interface and will be run at the start
Expand Down Expand Up @@ -108,5 +121,40 @@ func (p *provReqProcessor) Process(provReqs []*provreqwrapper.ProvisioningReques
}
}

// Cleanup cleans up internal state.
// CleanUp cleans up internal state
func (p *provReqProcessor) CleanUp() {}

// BookCapacity schedule fake pods for ProvisioningRequest that should have reserved capacity
// in the cluster.
func (p *provReqProcessor) BookCapacity(ctx *context.AutoscalingContext) error {
provReqs, err := p.client.ProvisioningRequests()
if err != nil {
return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err)
}
podsToCreate := []*apiv1.Pod{}
for _, provReq := range provReqs {
if !conditions.ShouldCapacityBeBooked(provReq) {
continue
}
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
if err != nil {
// ClusterAutoscaler was able to create pods before, so we shouldn't have error here.
// If there is an error, mark PR as invalid, because we won't be able to book capacity
// for it anyway.
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
if _, err := p.client.UpdateProvisioningRequest(provReq.ProvisioningRequest); err != nil {
klog.Errorf("failed to add Accepted condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err)
}
continue
}
podsToCreate = append(podsToCreate, pods...)
}
if len(podsToCreate) == 0 {
return nil
}
// Scheduling the pods to reserve capacity for provisioning request.
if _, _, err = p.injector.TrySchedulePods(ctx.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil {
return err
}
return nil
}
84 changes: 83 additions & 1 deletion cluster-autoscaler/processors/provreq/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,23 @@ limitations under the License.
package provreq

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/framework"

"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/config"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
)

func TestProcess(t *testing.T) {
Expand Down Expand Up @@ -146,7 +153,7 @@ func TestProcess(t *testing.T) {
additionalPr := provreqclient.ProvisioningRequestWrapperForTesting("namespace", "additional")
additionalPr.CreationTimestamp = metav1.NewTime(weekAgo)
additionalPr.Spec.ProvisioningClassName = v1beta1.ProvisioningClassCheckCapacity
processor := provReqProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr)}
processor := provReqProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr), nil}
processor.Process([]*provreqwrapper.ProvisioningRequest{pr, additionalPr})
assert.ElementsMatch(t, test.wantConditions, pr.Status.Conditions)
if len(test.conditions) == len(test.wantConditions) {
Expand All @@ -164,3 +171,78 @@ func TestProcess(t *testing.T) {
}
}
}

type fakeInjector struct {
pods []*apiv1.Pod
}

func (f *fakeInjector) TrySchedulePods(clusterSnapshot clustersnapshot.ClusterSnapshot, pods []*apiv1.Pod, isNodeAcceptable func(*framework.NodeInfo) bool, breakOnFailure bool) ([]scheduling.Status, int, error) {
f.pods = pods
return nil, 0, nil
}

func TestBookCapacity(t *testing.T) {
testCases := []struct {
name string
conditions []string
provReq *provreqwrapper.ProvisioningRequest
capacityIsBooked bool
}{
{
name: "ProvReq is new, check-capacity class",
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassCheckCapacity),
capacityIsBooked: false,
},
{
name: "ProvReq is Failed, best-effort-atomic class",
conditions: []string{v1beta1.Failed},
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassBestEffortAtomicScaleUp),
capacityIsBooked: false,
},
{
name: "ProvReq is Provisioned, unknown class",
conditions: []string{v1beta1.Provisioned},
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), "unknown"),
capacityIsBooked: false,
},
{
name: "ProvReq is Provisioned, capacity should be booked, check-capacity class",
conditions: []string{v1beta1.Provisioned},
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassCheckCapacity),
capacityIsBooked: true,
},
{
name: "ProvReq is Provisioned, capacity should be booked, best-effort-atomic class",
conditions: []string{v1beta1.Provisioned},
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassBestEffortAtomicScaleUp),
capacityIsBooked: true,
},
{
name: "ProvReq has BookingExpired, capacity should not be booked, best-effort-atomic class",
conditions: []string{v1beta1.Provisioned, v1beta1.BookingExpired},
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassBestEffortAtomicScaleUp),
capacityIsBooked: false,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
test := test
injector := &fakeInjector{pods: []*apiv1.Pod{}}
for _, condition := range test.conditions {
conditions.AddOrUpdateCondition(test.provReq, condition, metav1.ConditionTrue, "", "", metav1.Now())
}

processor := &provReqProcessor{
now: func() time.Time { return time.Now() },
client: provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, test.provReq),
maxUpdated: 20,
injector: injector,
}
ctx, _ := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, nil, nil, nil, nil, nil)
processor.BookCapacity(&ctx)
if (test.capacityIsBooked && len(injector.pods) == 0) || (!test.capacityIsBooked && len(injector.pods) > 0) {
t.Fail()
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,14 @@ import (

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
ca_errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/klog/v2"

ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
Expand Down Expand Up @@ -96,7 +91,6 @@ func (o *provReqOrchestrator) ScaleUp(

o.context.ClusterSnapshot.Fork()
defer o.context.ClusterSnapshot.Revert()
o.bookCapacity()

// unschedulablePods pods should belong to one ProvisioningClass, so only one provClass should try to ScaleUp.
for _, provClass := range o.provisioningClasses {
Expand All @@ -115,35 +109,3 @@ func (o *provReqOrchestrator) ScaleUpToNodeGroupMinSize(
) (*status.ScaleUpStatus, ca_errors.AutoscalerError) {
return nil, nil
}

func (o *provReqOrchestrator) bookCapacity() error {
provReqs, err := o.client.ProvisioningRequests()
if err != nil {
return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err)
}
podsToCreate := []*apiv1.Pod{}
for _, provReq := range provReqs {
if conditions.ShouldCapacityBeBooked(provReq) {
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
if err != nil {
// ClusterAutoscaler was able to create pods before, so we shouldn't have error here.
// If there is an error, mark PR as invalid, because we won't be able to book capacity
// for it anyway.
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
if _, err := o.client.UpdateProvisioningRequest(provReq.ProvisioningRequest); err != nil {
klog.Errorf("failed to add Accepted condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err)
}
continue
}
podsToCreate = append(podsToCreate, pods...)
}
}
if len(podsToCreate) == 0 {
return nil
}
// scheduling the pods to reserve capacity for provisioning request with BookCapacity condition
if _, _, err = o.injector.TrySchedulePods(o.context.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil {
klog.Warningf("Error during capacity booking: %v", err)
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,10 @@ func TestScaleUp(t *testing.T) {
scaleUpResult: status.ScaleUpNotNeeded,
},
{
name: "capacity in the cluster is booked",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, bookedCapacityProvReq},
name: "capacity is there, check-capacity class",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq},
provReqToScaleUp: newCheckCapacityMemProvReq,
scaleUpResult: status.ScaleUpNoOptionsAvailable,
scaleUpResult: status.ScaleUpSuccessful,
},
{
name: "unsupported ProvisioningRequest is ignored",
Expand All @@ -211,12 +211,6 @@ func TestScaleUp(t *testing.T) {
provReqToScaleUp: atomicScaleUpProvReq,
scaleUpResult: status.ScaleUpNotNeeded,
},
{
name: "some capacity is pre-booked, large atomic scale-up request doesn't fit",
provReqs: []*provreqwrapper.ProvisioningRequest{bookedCapacityProvReq, largeAtomicScaleUpProvReq},
provReqToScaleUp: largeAtomicScaleUpProvReq,
scaleUpResult: status.ScaleUpNoOptionsAvailable,
},
{
name: "capacity is there, large atomic scale-up request doesn't require scale-up",
provReqs: []*provreqwrapper.ProvisioningRequest{largeAtomicScaleUpProvReq},
Expand Down

0 comments on commit 7ac217c

Please sign in to comment.