-
Notifications
You must be signed in to change notification settings - Fork 4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
5286b3f
commit ced6c9e
Showing
9 changed files
with
277 additions
and
47 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
91 changes: 91 additions & 0 deletions
91
cluster-autoscaler/processors/provreq/provisioning_request_injector.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
/* | ||
Copyright 2024 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package provreq | ||
|
||
import ( | ||
"time" | ||
|
||
apiv1 "k8s.io/api/core/v1" | ||
apimeta "k8s.io/apimachinery/pkg/api/meta" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/autoscaler/cluster-autoscaler/context" | ||
"k8s.io/autoscaler/cluster-autoscaler/processors/pods" | ||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" | ||
provreqconditions "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" | ||
provreqpods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" | ||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/klog/v2" | ||
) | ||
|
||
const ( | ||
defaultRetryTime = 10 * time.Minute | ||
) | ||
|
||
// ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list. | ||
type ProvisioningRequestPodsInjector struct { | ||
client provisioningRequestClient | ||
now func() time.Time | ||
} | ||
|
||
// Process pick one ProvisioningRequest, update Accepted condition and inject pods to unscheduled pods list. | ||
func (p *ProvisioningRequestPodsInjector) Process( | ||
_ *context.AutoscalingContext, | ||
unschedulablePods []*apiv1.Pod, | ||
) ([]*apiv1.Pod, error) { | ||
provReqs, err := p.client.ProvisioningRequests() | ||
if err != nil { | ||
return nil, err | ||
} | ||
for _, pr := range provReqs { | ||
conditions := pr.Conditions() | ||
if apimeta.IsStatusConditionTrue(conditions, v1beta1.Failed) || apimeta.IsStatusConditionTrue(conditions, v1beta1.Provisioned) { | ||
continue | ||
} | ||
|
||
provisioned := apimeta.FindStatusCondition(conditions, v1beta1.Provisioned) | ||
accepted := apimeta.FindStatusCondition(conditions, v1beta1.Provisioned) | ||
|
||
//TODO(yaroslava): support exponential backoff | ||
// Inject pods if ProvReq is new or it has Provisioned == False condition more than defaultRetryTime | ||
if accepted == nil || provisioned != nil && | ||
provisioned.LastTransitionTime.Add(defaultRetryTime).Before(p.now()) { | ||
provreqpods, err := provreqpods.PodsForProvisioningRequest(pr) | ||
if err != nil { | ||
klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name()) | ||
provreqconditions.AddOrUpdateCondition(pr, v1beta1.Failed, metav1.ConditionTrue, provreqconditions.FailedToCreatePodsReason, err.Error(), metav1.NewTime(p.now())) | ||
continue | ||
} | ||
unschedulablePods := append(unschedulablePods, provreqpods...) | ||
provreqconditions.AddOrUpdateCondition(pr, v1beta1.Accepted, metav1.ConditionTrue, provreqconditions.AcceptedReason, provreqconditions.AcceptedMsg, metav1.NewTime(p.now())) | ||
return unschedulablePods, nil | ||
} | ||
} | ||
return unschedulablePods, nil | ||
} | ||
|
||
// CleanUp cleans up the processor's internal structures. | ||
func (p *ProvisioningRequestPodsInjector) CleanUp() {} | ||
|
||
// NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor. | ||
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (pods.PodListProcessor, error) { | ||
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &ProvisioningRequestPodsInjector{client: client, now: time.Now}, nil | ||
} |
128 changes: 128 additions & 0 deletions
128
cluster-autoscaler/processors/provreq/provisioning_request_injector_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
/* | ||
Copyright 2024 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package provreq | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
v1 "k8s.io/api/core/v1" | ||
apimeta "k8s.io/apimachinery/pkg/api/meta" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" | ||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" | ||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" | ||
) | ||
|
||
func TestProvisioningRequestPodsInjector(t *testing.T) { | ||
now := time.Now() | ||
minAgo := now.Add(-1 * time.Minute) | ||
hourAgo := now.Add(-1 * time.Hour) | ||
|
||
accepted := metav1.Condition{ | ||
Type: v1beta1.Accepted, | ||
Status: metav1.ConditionTrue, | ||
LastTransitionTime: metav1.NewTime(minAgo), | ||
} | ||
failed := metav1.Condition{ | ||
Type: v1beta1.Failed, | ||
Status: metav1.ConditionTrue, | ||
LastTransitionTime: metav1.NewTime(hourAgo), | ||
} | ||
provisioned := metav1.Condition{ | ||
Type: v1beta1.Provisioned, | ||
Status: metav1.ConditionTrue, | ||
LastTransitionTime: metav1.NewTime(hourAgo), | ||
} | ||
notProvisioned := metav1.Condition{ | ||
Type: v1beta1.Provisioned, | ||
Status: metav1.ConditionFalse, | ||
LastTransitionTime: metav1.NewTime(hourAgo), | ||
} | ||
notProvisionedRecently := metav1.Condition{ | ||
Type: v1beta1.Provisioned, | ||
Status: metav1.ConditionFalse, | ||
LastTransitionTime: metav1.NewTime(minAgo), | ||
} | ||
|
||
podsA := 10 | ||
newProvReqA := testProvisioningRequestWithCondition("new", podsA) | ||
newAcceptedProvReqA := testProvisioningRequestWithCondition("new", podsA, accepted) | ||
|
||
podsB := 20 | ||
notProvisionedAcceptedProvReqB := testProvisioningRequestWithCondition("provisioned-false-B", podsB, notProvisioned, accepted) | ||
provisionedAcceptedProvReqB := testProvisioningRequestWithCondition("provisioned-and-accepted", podsB, provisioned, accepted) | ||
failedProvReq := testProvisioningRequestWithCondition("failed", podsA, failed) | ||
notProvisionedRecentlyProvReqB := testProvisioningRequestWithCondition("provisioned-false-recently-B", podsB, notProvisionedRecently) | ||
|
||
testCases := []struct { | ||
name string | ||
provReqs []*provreqwrapper.ProvisioningRequest | ||
wantUnscheduledPodCount int | ||
wantUpdatedConditionName string | ||
}{ | ||
{ | ||
name: "New ProvisioningRequest, pods are injected and Accepted condition is added", | ||
provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqA, provisionedAcceptedProvReqB}, | ||
wantUnscheduledPodCount: podsA, | ||
wantUpdatedConditionName: newProvReqA.Name(), | ||
}, | ||
{ | ||
name: "New ProvisioningRequest, pods are injected and Accepted condition is updated", | ||
provReqs: []*provreqwrapper.ProvisioningRequest{newAcceptedProvReqA, provisionedAcceptedProvReqB}, | ||
wantUnscheduledPodCount: podsA, | ||
wantUpdatedConditionName: newAcceptedProvReqA.Name(), | ||
}, | ||
{ | ||
name: "Provisioned=False, pods are injected", | ||
provReqs: []*provreqwrapper.ProvisioningRequest{notProvisionedAcceptedProvReqB, failedProvReq}, | ||
wantUnscheduledPodCount: podsB, | ||
wantUpdatedConditionName: notProvisionedAcceptedProvReqB.Name(), | ||
}, | ||
{ | ||
name: "Provisioned=True, no pods are injected", | ||
provReqs: []*provreqwrapper.ProvisioningRequest{provisionedAcceptedProvReqB, failedProvReq, notProvisionedRecentlyProvReqB}, | ||
}, | ||
} | ||
for _, tc := range testCases { | ||
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...) | ||
injector := ProvisioningRequestPodsInjector{client, func() time.Time { return now }} | ||
getUnscheduledPods, err := injector.Process(nil, []*v1.Pod{}) | ||
if err != nil { | ||
t.Errorf("%s failed: injector.Process return error %v", tc.name, err) | ||
} | ||
if len(getUnscheduledPods) != tc.wantUnscheduledPodCount { | ||
t.Errorf("%s failed: injector.Process return %d unscheduled pods, want %d", tc.name, len(getUnscheduledPods), tc.wantUnscheduledPodCount) | ||
} | ||
if tc.wantUpdatedConditionName == "" { | ||
continue | ||
} | ||
pr, _ := client.ProvisioningRequest("ns", tc.wantUpdatedConditionName) | ||
accepted := apimeta.FindStatusCondition(pr.Conditions(), v1beta1.Accepted) | ||
if accepted == nil || accepted.LastTransitionTime != metav1.NewTime(now) { | ||
t.Errorf("%s: injector.Process hasn't update accepted condition for ProvisioningRequest %s", tc.name, tc.wantUpdatedConditionName) | ||
} | ||
} | ||
|
||
} | ||
|
||
func testProvisioningRequestWithCondition(name string, podCount int, conditions ...metav1.Condition) *provreqwrapper.ProvisioningRequest { | ||
pr := provreqwrapper.BuildTestProvisioningRequest("ns", name, "10", "100", "", int32(podCount), false, time.Now(), "ProvisioningClass") | ||
pr.V1Beta1().Status.Conditions = conditions | ||
return pr | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.