Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ProvisioningRequest injector #6529

Merged
merged 3 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cluster-autoscaler/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module k8s.io/autoscaler/cluster-autoscaler

go 1.21
go 1.21.3

require (
cloud.google.com/go/compute/metadata v0.2.3
Expand Down
5 changes: 5 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,11 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
return nil, err
}
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig)
if err != nil {
return nil, err
}
podListProcessor.AddProcessor(injector)
}
opts.Processors.PodListProcessor = podListProcessor
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}
Expand Down
101 changes: 101 additions & 0 deletions cluster-autoscaler/processors/provreq/provisioning_request_injector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provreq

import (
"time"

apiv1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
provreqconditions "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreqpods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)

const (
defaultRetryTime = 10 * time.Minute
)

// SupportedProvisioningClasses is a list of supported ProvisioningClasses in ClusterAutoscaler.
var SupportedProvisioningClasses = []string{v1beta1.ProvisioningClassCheckCapacity}

// ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list.
type ProvisioningRequestPodsInjector struct {
client provisioningRequestClient
clock clock.PassiveClock
}

// Process pick one ProvisioningRequest, update Accepted condition and inject pods to unscheduled pods list.
func (p *ProvisioningRequestPodsInjector) Process(
_ *context.AutoscalingContext,
unschedulablePods []*apiv1.Pod,
) ([]*apiv1.Pod, error) {
provReqs, err := p.client.ProvisioningRequests()
if err != nil {
return nil, err
}
for _, pr := range provReqs {
conditions := pr.Conditions()
if apimeta.IsStatusConditionTrue(conditions, v1beta1.Failed) || apimeta.IsStatusConditionTrue(conditions, v1beta1.Provisioned) {
continue
}

provisioned := apimeta.FindStatusCondition(conditions, v1beta1.Provisioned)

//TODO(yaroslava): support exponential backoff
// Inject pods if ProvReq wasn't scaled up before or it has Provisioned == False condition more than defaultRetryTime
inject := true
if provisioned != nil {
if provisioned.Status == metav1.ConditionFalse && provisioned.LastTransitionTime.Add(defaultRetryTime).Before(p.clock.Now()) {
inject = true
} else {
inject = false
}
}
if inject {
provreqpods, err := provreqpods.PodsForProvisioningRequest(pr)
if err != nil {
klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name())
provreqconditions.AddOrUpdateCondition(pr, v1beta1.Failed, metav1.ConditionTrue, provreqconditions.FailedToCreatePodsReason, err.Error(), metav1.NewTime(p.clock.Now()))
continue
}
unschedulablePods := append(unschedulablePods, provreqpods...)
provreqconditions.AddOrUpdateCondition(pr, v1beta1.Accepted, metav1.ConditionTrue, provreqconditions.AcceptedReason, provreqconditions.AcceptedMsg, metav1.NewTime(p.clock.Now()))
return unschedulablePods, nil
}
}
return unschedulablePods, nil
}

// CleanUp cleans up the processor's internal structures.
func (p *ProvisioningRequestPodsInjector) CleanUp() {}

// NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor.
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (pods.PodListProcessor, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
}
return &ProvisioningRequestPodsInjector{client: client, clock: clock.RealClock{}}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provreq

import (
"context"
"testing"
"time"

v1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
clock "k8s.io/utils/clock/testing"
)

func TestProvisioningRequestPodsInjector(t *testing.T) {
now := time.Now()
minAgo := now.Add(-1 * time.Minute)
hourAgo := now.Add(-1 * time.Hour)

accepted := metav1.Condition{
Type: v1beta1.Accepted,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(minAgo),
}
failed := metav1.Condition{
Type: v1beta1.Failed,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(hourAgo),
}
provisioned := metav1.Condition{
Type: v1beta1.Provisioned,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(hourAgo),
}
notProvisioned := metav1.Condition{
Type: v1beta1.Provisioned,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.NewTime(hourAgo),
}
unknownProvisioned := metav1.Condition{
Type: v1beta1.Provisioned,
Status: metav1.ConditionUnknown,
LastTransitionTime: metav1.NewTime(hourAgo),
}
notProvisionedRecently := metav1.Condition{
Type: v1beta1.Provisioned,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.NewTime(minAgo),
}

podsA := 10
newProvReqA := testProvisioningRequestWithCondition("new", podsA)
newAcceptedProvReqA := testProvisioningRequestWithCondition("new-accepted", podsA, accepted)

podsB := 20
notProvisionedAcceptedProvReqB := testProvisioningRequestWithCondition("provisioned-false-B", podsB, notProvisioned, accepted)
provisionedAcceptedProvReqB := testProvisioningRequestWithCondition("provisioned-and-accepted", podsB, provisioned, accepted)
failedProvReq := testProvisioningRequestWithCondition("failed", podsA, failed)
notProvisionedRecentlyProvReqB := testProvisioningRequestWithCondition("provisioned-false-recently-B", podsB, notProvisionedRecently)
unknownProvisionedProvReqB := testProvisioningRequestWithCondition("provisioned-unknown-B", podsB, unknownProvisioned)

testCases := []struct {
name string
provReqs []*provreqwrapper.ProvisioningRequest
wantUnscheduledPodCount int
wantUpdatedConditionName string
}{
{
name: "New ProvisioningRequest, pods are injected and Accepted condition is added",
provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqA, provisionedAcceptedProvReqB},
wantUnscheduledPodCount: podsA,
wantUpdatedConditionName: newProvReqA.Name(),
},
{
name: "New ProvisioningRequest, pods are injected and Accepted condition is updated",
provReqs: []*provreqwrapper.ProvisioningRequest{newAcceptedProvReqA, provisionedAcceptedProvReqB},
wantUnscheduledPodCount: podsA,
wantUpdatedConditionName: newAcceptedProvReqA.Name(),
},
{
name: "Provisioned=False, pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{notProvisionedAcceptedProvReqB, failedProvReq},
wantUnscheduledPodCount: podsB,
wantUpdatedConditionName: notProvisionedAcceptedProvReqB.Name(),
},
{
name: "Provisioned=True, no pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{provisionedAcceptedProvReqB, failedProvReq, notProvisionedRecentlyProvReqB},
},
{
name: "Provisioned=Unknown, no pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{unknownProvisionedProvReqB, failedProvReq, notProvisionedRecentlyProvReqB},
},
}
for _, tc := range testCases {
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...)
injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now)}
getUnscheduledPods, err := injector.Process(nil, []*v1.Pod{})
if err != nil {
t.Errorf("%s failed: injector.Process return error %v", tc.name, err)
}
if len(getUnscheduledPods) != tc.wantUnscheduledPodCount {
t.Errorf("%s failed: injector.Process return %d unscheduled pods, want %d", tc.name, len(getUnscheduledPods), tc.wantUnscheduledPodCount)
}
if tc.wantUpdatedConditionName == "" {
continue
}
pr, _ := client.ProvisioningRequest("ns", tc.wantUpdatedConditionName)
accepted := apimeta.FindStatusCondition(pr.Conditions(), v1beta1.Accepted)
if accepted == nil || accepted.LastTransitionTime != metav1.NewTime(now) {
t.Errorf("%s: injector.Process hasn't update accepted condition for ProvisioningRequest %s", tc.name, tc.wantUpdatedConditionName)
}
}

}

func testProvisioningRequestWithCondition(name string, podCount int, conditions ...metav1.Condition) *provreqwrapper.ProvisioningRequest {
pr := provreqwrapper.BuildTestProvisioningRequest("ns", name, "10", "100", "", int32(podCount), false, time.Now(), "ProvisioningClass")
pr.V1Beta1().Status.Conditions = conditions
return pr
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,14 @@ type Detail string
// The following constants list all currently available Conditions Type values.
// See: https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Condition
const (
// Accepted indicates that the ProvisioningRequest was accepted by ClusterAutoscaler,
// so ClusterAutoscaler will attempt to provision the nodes for it.
Accepted string = "Accepted"
yaroslava-serdiuk marked this conversation as resolved.
Show resolved Hide resolved
// BookingExpired indicates that the ProvisioningRequest had Provisioned condition before
// and capacity reservation time is expired.
BookingExpired string = "BookingExpired"
// CapacityRevoked indicates that requested resources are not longer valid.
CapacityRevoked string = "CapacityRevoked"
yaroslava-serdiuk marked this conversation as resolved.
Show resolved Hide resolved
// Provisioned indicates that all of the requested resources were created
// and are available in the cluster. CA will set this condition when the
// VM creation finishes successfully.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
Expand Down Expand Up @@ -121,13 +122,13 @@ func (o *provReqOrchestrator) bookCapacity() error {
}
podsToCreate := []*apiv1.Pod{}
for _, provReq := range provReqs {
if shouldCapacityBeBooked(provReq) {
if conditions.ShouldCapacityBeBooked(provReq) {
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
if err != nil {
// ClusterAutoscaler was able to create pods before, so we shouldn't have error here.
// If there is an error, mark PR as invalid, because we won't be able to book capacity
// for it anyway.
setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
continue
}
podsToCreate = append(podsToCreate, pods...)
Expand All @@ -151,10 +152,10 @@ func (o *provReqOrchestrator) scaleUp(unschedulablePods []*apiv1.Pod) (bool, err
}
st, _, err := o.injector.TrySchedulePods(o.context.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true)
if len(st) < len(unschedulablePods) || err != nil {
setCondition(provReq, v1beta1.Provisioned, metav1.ConditionFalse, CapacityIsFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now())
conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionFalse, conditions.CapacityIsNotFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now())
return false, err
}
setCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, CapacityIsFoundReason, "Capacity is found in the cluster.", metav1.Now())
conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, conditions.CapacityIsFoundReason, conditions.CapacityIsFoundMsg, metav1.Now())
return true, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
)

Expand Down Expand Up @@ -75,14 +76,14 @@ func (p *checkCapacityProcessor) Process(provReqs []*provreqwrapper.Provisioning
if updated >= p.maxUpdated {
break
}
setCondition(provReq, v1beta1.BookingExpired, metav1.ConditionTrue, CapacityReservationTimeExpiredReason, CapacityReservationTimeExpiredMsg, metav1.NewTime(p.now()))
conditions.AddOrUpdateCondition(provReq, v1beta1.BookingExpired, metav1.ConditionTrue, conditions.CapacityReservationTimeExpiredReason, conditions.CapacityReservationTimeExpiredMsg, metav1.NewTime(p.now()))
updated++
}
for _, provReq := range failedProvReq {
if updated >= p.maxUpdated {
break
}
setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, ExpiredReason, ExpiredMsg, metav1.NewTime(p.now()))
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.ExpiredReason, conditions.ExpiredMsg, metav1.NewTime(p.now()))
updated++
}
}
Expand Down
Loading
Loading