From 5f94f2c42957448e3fdeb287ed0415c115b547c2 Mon Sep 17 00:00:00 2001 From: Yaroslava Serdiuk Date: Wed, 17 Apr 2024 19:37:54 +0300 Subject: [PATCH] Add provreqOrchestrator that handle ProvReq classes (#6627) * Add provreqOrchestrator that handle ProvReq classes * Review remarks * Review remarks --- cluster-autoscaler/main.go | 5 +- .../provreq/provisioning_request_injector.go | 2 +- .../provreq/provisioning_request_processor.go | 7 +- .../checkcapacity/orchestrator.go | 180 ------------------ .../checkcapacity/provisioningclass.go | 104 ++++++++++ .../orchestrator/orchestrator.go | 148 ++++++++++++++ .../orchestrator_test.go | 20 +- .../orchestrator/wrapper_orchestrator.go | 23 +-- .../orchestrator/wrapper_orchestrator_test.go | 4 +- .../provreqclient/client.go | 26 +++ .../provreqclient/client_test.go | 67 +++++++ cluster-autoscaler/utils/errors/errors.go | 3 + 12 files changed, 375 insertions(+), 214 deletions(-) delete mode 100644 cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go create mode 100644 cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go create mode 100644 cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go rename cluster-autoscaler/provisioningrequest/{checkcapacity => orchestrator}/orchestrator_test.go (89%) rename cluster-autoscaler/{core/scaleup => provisioningrequest}/orchestrator/wrapper_orchestrator.go (83%) rename cluster-autoscaler/{core/scaleup => provisioningrequest}/orchestrator/wrapper_orchestrator_test.go (96%) diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index f3bd86653855..74e0b01a8409 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -60,6 +60,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates" "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/emptycandidates" "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/previouscandidates" + provreqorchestrator "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/orchestrator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" "k8s.io/autoscaler/cluster-autoscaler/simulator/options" @@ -494,10 +495,12 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager())) restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) - scaleUpOrchestrator, err := orchestrator.NewWrapperOrchestrator(restConfig) + provreqOrchestrator, err := provreqorchestrator.New(restConfig) if err != nil { return nil, err } + scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator) + opts.ScaleUpOrchestrator = scaleUpOrchestrator provreqProcesor, err := provreq.NewCombinedProvReqProcessor(restConfig, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor()}) if err != nil { diff --git a/cluster-autoscaler/processors/provreq/provisioning_request_injector.go b/cluster-autoscaler/processors/provreq/provisioning_request_injector.go index a44bd5d336e0..2c8394846a98 100644 --- a/cluster-autoscaler/processors/provreq/provisioning_request_injector.go +++ b/cluster-autoscaler/processors/provreq/provisioning_request_injector.go @@ -42,7 +42,7 @@ var SupportedProvisioningClasses = []string{v1beta1.ProvisioningClassCheckCapaci // ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list. type ProvisioningRequestPodsInjector struct { - client provisioningRequestClient + client *provreqclient.ProvisioningRequestClient clock clock.PassiveClock } diff --git a/cluster-autoscaler/processors/provreq/provisioning_request_processor.go b/cluster-autoscaler/processors/provreq/provisioning_request_processor.go index 0085c026e365..06cb7af6b546 100644 --- a/cluster-autoscaler/processors/provreq/provisioning_request_processor.go +++ b/cluster-autoscaler/processors/provreq/provisioning_request_processor.go @@ -30,15 +30,10 @@ type ProvisioningRequestProcessor interface { CleanUp() } -type provisioningRequestClient interface { - ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) - ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) -} - // CombinedProvReqProcessor is responsible for processing ProvisioningRequest for each ProvisioningClass // every CA loop and updating conditions for expired ProvisioningRequests. type CombinedProvReqProcessor struct { - client provisioningRequestClient + client *provreqclient.ProvisioningRequestClient processors []ProvisioningRequestProcessor } diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go b/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go deleted file mode 100644 index 0d8ef058d1eb..000000000000 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go +++ /dev/null @@ -1,180 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package checkcapacity - -import ( - "fmt" - - appsv1 "k8s.io/api/apps/v1" - apiv1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" - "k8s.io/autoscaler/cluster-autoscaler/clusterstate" - "k8s.io/autoscaler/cluster-autoscaler/context" - "k8s.io/autoscaler/cluster-autoscaler/estimator" - "k8s.io/autoscaler/cluster-autoscaler/processors/status" - "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" - provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" - "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" - "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" - "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" - "k8s.io/autoscaler/cluster-autoscaler/utils/errors" - "k8s.io/autoscaler/cluster-autoscaler/utils/taints" - "k8s.io/client-go/rest" - "k8s.io/klog/v2" - - ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" - schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" -) - -type provisioningRequestClient interface { - ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) - ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) -} - -type provReqOrchestrator struct { - initialized bool - context *context.AutoscalingContext - client provisioningRequestClient - injector *scheduling.HintingSimulator -} - -// New return new orchestrator. -func New(kubeConfig *rest.Config) (*provReqOrchestrator, error) { - client, err := provreqclient.NewProvisioningRequestClient(kubeConfig) - if err != nil { - return nil, err - } - - return &provReqOrchestrator{client: client}, nil -} - -// Initialize initialize orchestrator. -func (o *provReqOrchestrator) Initialize( - autoscalingContext *context.AutoscalingContext, - processors *ca_processors.AutoscalingProcessors, - clusterStateRegistry *clusterstate.ClusterStateRegistry, - estimatorBuilder estimator.EstimatorBuilder, - taintConfig taints.TaintConfig, -) { - o.initialized = true - o.context = autoscalingContext - o.injector = scheduling.NewHintingSimulator(autoscalingContext.PredicateChecker) -} - -// ScaleUp return if there is capacity in the cluster for pods from ProvisioningRequest. -func (o *provReqOrchestrator) ScaleUp( - unschedulablePods []*apiv1.Pod, - nodes []*apiv1.Node, - daemonSets []*appsv1.DaemonSet, - nodeInfos map[string]*schedulerframework.NodeInfo, -) (*status.ScaleUpStatus, errors.AutoscalerError) { - if !o.initialized { - return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized")) - } - if len(unschedulablePods) == 0 { - return &status.ScaleUpStatus{}, nil - } - if _, err := o.verifyProvisioningRequestClass(unschedulablePods); err != nil { - return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, err.Error())) - } - - o.context.ClusterSnapshot.Fork() - defer o.context.ClusterSnapshot.Revert() - if err := o.bookCapacity(); err != nil { - return nil, errors.NewAutoscalerError(errors.InternalError, err.Error()) - } - scaleUpIsSuccessful, err := o.scaleUp(unschedulablePods) - if err != nil { - return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "error during ScaleUp: %s", err.Error())) - } - if scaleUpIsSuccessful { - return &status.ScaleUpStatus{Result: status.ScaleUpSuccessful}, nil - } - return &status.ScaleUpStatus{Result: status.ScaleUpNoOptionsAvailable}, nil -} - -// ScaleUpToNodeGroupMinSize is no-op. -func (o *provReqOrchestrator) ScaleUpToNodeGroupMinSize( - nodes []*apiv1.Node, - nodeInfos map[string]*schedulerframework.NodeInfo) (*status.ScaleUpStatus, errors.AutoscalerError) { - return nil, nil -} - -func (o *provReqOrchestrator) bookCapacity() error { - provReqs, err := o.client.ProvisioningRequests() - if err != nil { - return fmt.Errorf("Couldn't fetch ProvisioningRequests in the cluster: %v", err) - } - podsToCreate := []*apiv1.Pod{} - for _, provReq := range provReqs { - if conditions.ShouldCapacityBeBooked(provReq) { - pods, err := provreq_pods.PodsForProvisioningRequest(provReq) - if err != nil { - // ClusterAutoscaler was able to create pods before, so we shouldn't have error here. - // If there is an error, mark PR as invalid, because we won't be able to book capacity - // for it anyway. - conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now()) - continue - } - podsToCreate = append(podsToCreate, pods...) - } - } - if len(podsToCreate) == 0 { - return nil - } - // scheduling the pods to reserve capacity for provisioning request with BookCapacity condition - if _, _, err = o.injector.TrySchedulePods(o.context.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil { - klog.Warningf("Error during capacity booking: %v", err) - } - return nil -} - -// Assuming that all unschedulable pods comes from one ProvisioningRequest. -func (o *provReqOrchestrator) scaleUp(unschedulablePods []*apiv1.Pod) (bool, error) { - provReq, err := o.client.ProvisioningRequest(unschedulablePods[0].Namespace, unschedulablePods[0].OwnerReferences[0].Name) - if err != nil { - return false, fmt.Errorf("Failed retrive ProvisioningRequest from unscheduled pods, err: %v", err) - } - st, _, err := o.injector.TrySchedulePods(o.context.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true) - if len(st) < len(unschedulablePods) || err != nil { - conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionFalse, conditions.CapacityIsNotFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now()) - return false, err - } - conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, conditions.CapacityIsFoundReason, conditions.CapacityIsFoundMsg, metav1.Now()) - return true, nil -} - -// verifyPods check that all pods belong to one ProvisioningRequest that belongs to check-capacity ProvisioningRequst class. -func (o *provReqOrchestrator) verifyProvisioningRequestClass(unschedulablePods []*apiv1.Pod) (*provreqwrapper.ProvisioningRequest, error) { - provReq, err := o.client.ProvisioningRequest(unschedulablePods[0].Namespace, unschedulablePods[0].OwnerReferences[0].Name) - if err != nil { - return nil, fmt.Errorf("Failed retrive ProvisioningRequest from unscheduled pods, err: %v", err) - } - if provReq.Spec.ProvisioningClassName != v1beta1.ProvisioningClassCheckCapacity { - return nil, fmt.Errorf("ProvisioningRequestClass is not %s", v1beta1.ProvisioningClassCheckCapacity) - } - for _, pod := range unschedulablePods { - if pod.Namespace != unschedulablePods[0].Namespace { - return nil, fmt.Errorf("Pods %s and %s are from different namespaces", pod.Name, unschedulablePods[0].Name) - } - if pod.OwnerReferences[0].Name != unschedulablePods[0].OwnerReferences[0].Name { - return nil, fmt.Errorf("Pods %s and %s have different OwnerReference", pod.Name, unschedulablePods[0].Name) - } - } - return provReq, nil -} diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go b/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go new file mode 100644 index 000000000000..ad03b1656f85 --- /dev/null +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go @@ -0,0 +1,104 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkcapacity + +import ( + appsv1 "k8s.io/api/apps/v1" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/clusterstate" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/estimator" + "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" + "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/autoscaler/cluster-autoscaler/utils/taints" + + ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" +) + +type checkCapacityProvClass struct { + context *context.AutoscalingContext + client *provreqclient.ProvisioningRequestClient + injector *scheduling.HintingSimulator +} + +// New create check-capacity scale-up mode. +func New( + client *provreqclient.ProvisioningRequestClient, +) *checkCapacityProvClass { + return &checkCapacityProvClass{client: client} +} + +func (o *checkCapacityProvClass) Initialize( + autoscalingContext *context.AutoscalingContext, + processors *ca_processors.AutoscalingProcessors, + clusterStateRegistry *clusterstate.ClusterStateRegistry, + estimatorBuilder estimator.EstimatorBuilder, + taintConfig taints.TaintConfig, + injector *scheduling.HintingSimulator, +) { + o.context = autoscalingContext + o.injector = injector +} + +// Provision return if there is capacity in the cluster for pods from ProvisioningRequest. +func (o *checkCapacityProvClass) Provision( + unschedulablePods []*apiv1.Pod, + nodes []*apiv1.Node, + daemonSets []*appsv1.DaemonSet, + nodeInfos map[string]*schedulerframework.NodeInfo, +) (*status.ScaleUpStatus, errors.AutoscalerError) { + if len(unschedulablePods) == 0 { + return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil + } + pr, err := provreqclient.ProvisioningRequestForPods(o.client, unschedulablePods) + if err != nil { + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, err.Error())) + } + if pr.Spec.ProvisioningClassName != v1beta1.ProvisioningClassCheckCapacity { + return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil + } + + o.context.ClusterSnapshot.Fork() + defer o.context.ClusterSnapshot.Revert() + + scaleUpIsSuccessful, err := o.checkcapacity(unschedulablePods, pr) + if err != nil { + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "error during ScaleUp: %s", err.Error())) + } + if scaleUpIsSuccessful { + return &status.ScaleUpStatus{Result: status.ScaleUpSuccessful}, nil + } + return &status.ScaleUpStatus{Result: status.ScaleUpNoOptionsAvailable}, nil +} + +// Assuming that all unschedulable pods comes from one ProvisioningRequest. +func (o *checkCapacityProvClass) checkcapacity(unschedulablePods []*apiv1.Pod, provReq *provreqwrapper.ProvisioningRequest) (bool, error) { + st, _, err := o.injector.TrySchedulePods(o.context.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true) + if len(st) < len(unschedulablePods) || err != nil { + conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionFalse, conditions.CapacityIsNotFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now()) + return false, err + } + conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, conditions.CapacityIsFoundReason, conditions.CapacityIsFoundMsg, metav1.Now()) + return true, nil +} diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go new file mode 100644 index 000000000000..d4d31ff05528 --- /dev/null +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go @@ -0,0 +1,148 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package orchestrator + +import ( + "fmt" + + appsv1 "k8s.io/api/apps/v1" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/clusterstate" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/estimator" + "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" + provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" + "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" + ca_errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/autoscaler/cluster-autoscaler/utils/taints" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + + ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" +) + +type provisioningClass interface { + Provision([]*apiv1.Pod, []*apiv1.Node, []*appsv1.DaemonSet, + map[string]*schedulerframework.NodeInfo) (*status.ScaleUpStatus, ca_errors.AutoscalerError) + Initialize(*context.AutoscalingContext, *ca_processors.AutoscalingProcessors, *clusterstate.ClusterStateRegistry, + estimator.EstimatorBuilder, taints.TaintConfig, *scheduling.HintingSimulator) +} + +// provReqOrchestrator is an orchestrator that contains orchestrators for all supported Provisioning Classes. +type provReqOrchestrator struct { + initialized bool + context *context.AutoscalingContext + client *provreqclient.ProvisioningRequestClient + injector *scheduling.HintingSimulator + provisioningClasses []provisioningClass +} + +// New return new orchestrator. +func New(kubeConfig *rest.Config) (*provReqOrchestrator, error) { + client, err := provreqclient.NewProvisioningRequestClient(kubeConfig) + if err != nil { + return nil, err + } + + return &provReqOrchestrator{client: client, provisioningClasses: []provisioningClass{checkcapacity.New(client)}}, nil +} + +// Initialize initialize orchestrator. +func (o *provReqOrchestrator) Initialize( + autoscalingContext *context.AutoscalingContext, + processors *ca_processors.AutoscalingProcessors, + clusterStateRegistry *clusterstate.ClusterStateRegistry, + estimatorBuilder estimator.EstimatorBuilder, + taintConfig taints.TaintConfig, +) { + o.initialized = true + o.context = autoscalingContext + o.injector = scheduling.NewHintingSimulator(autoscalingContext.PredicateChecker) + for _, mode := range o.provisioningClasses { + mode.Initialize(autoscalingContext, processors, clusterStateRegistry, estimatorBuilder, taintConfig, o.injector) + } +} + +// ScaleUp run ScaleUp for each Provisionining Class. As of now, CA pick one ProvisioningRequest, +// so only one ProvisioningClass return non empty scaleUp result. +// In case we implement multiple ProvisioningRequest ScaleUp, the function should return combined status +func (o *provReqOrchestrator) ScaleUp( + unschedulablePods []*apiv1.Pod, + nodes []*apiv1.Node, + daemonSets []*appsv1.DaemonSet, + nodeInfos map[string]*schedulerframework.NodeInfo, +) (*status.ScaleUpStatus, ca_errors.AutoscalerError) { + if !o.initialized { + return &status.ScaleUpStatus{}, ca_errors.ToAutoscalerError(ca_errors.InternalError, fmt.Errorf("provisioningrequest.Orchestrator is not initialized")) + } + + o.context.ClusterSnapshot.Fork() + defer o.context.ClusterSnapshot.Revert() + o.bookCapacity() + + // unschedulablePods pods should belong to one ProvisioningClass, so only one provClass should try to ScaleUp. + for _, provClass := range o.provisioningClasses { + st, err := provClass.Provision(unschedulablePods, nodes, daemonSets, nodeInfos) + if err != nil || st != nil && st.Result != status.ScaleUpNotTried { + return st, err + } + } + return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil +} + +// ScaleUpToNodeGroupMinSize doesn't have implementation for ProvisioningRequest Orchestrator. +func (o *provReqOrchestrator) ScaleUpToNodeGroupMinSize( + nodes []*apiv1.Node, + nodeInfos map[string]*schedulerframework.NodeInfo, +) (*status.ScaleUpStatus, ca_errors.AutoscalerError) { + return nil, nil +} + +func (o *provReqOrchestrator) bookCapacity() error { + provReqs, err := o.client.ProvisioningRequests() + if err != nil { + return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err) + } + podsToCreate := []*apiv1.Pod{} + for _, provReq := range provReqs { + if conditions.ShouldCapacityBeBooked(provReq) { + pods, err := provreq_pods.PodsForProvisioningRequest(provReq) + if err != nil { + // ClusterAutoscaler was able to create pods before, so we shouldn't have error here. + // If there is an error, mark PR as invalid, because we won't be able to book capacity + // for it anyway. + conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now()) + continue + } + podsToCreate = append(podsToCreate, pods...) + } + } + if len(podsToCreate) == 0 { + return nil + } + // scheduling the pods to reserve capacity for provisioning request with BookCapacity condition + if _, _, err = o.injector.TrySchedulePods(o.context.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil { + klog.Warningf("Error during capacity booking: %v", err) + } + return nil +} diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator_test.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go similarity index 89% rename from cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator_test.go rename to cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go index 33184d69f28c..5d2bd3bf0249 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package checkcapacity +package orchestrator import ( "context" @@ -31,11 +31,12 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/config" . "k8s.io/autoscaler/cluster-autoscaler/core/test" "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" - "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" + "k8s.io/autoscaler/cluster-autoscaler/utils/taints" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -68,8 +69,9 @@ func TestScaleUp(t *testing.T) { err bool }{ { - name: "no ProvisioningRequests", - provReqs: []*provreqwrapper.ProvisioningRequest{}, + name: "no ProvisioningRequests", + provReqs: []*provreqwrapper.ProvisioningRequest{}, + scaleUpResult: status.ScaleUpNotTried, }, { name: "one ProvisioningRequest", @@ -87,7 +89,7 @@ func TestScaleUp(t *testing.T) { name: "pods from different ProvisioningRequest class", provReqs: []*provreqwrapper.ProvisioningRequest{newCpuProvReq, bookedCapacityProvReq, differentProvReqClass}, provReqToScaleUp: differentProvReqClass, - err: true, + scaleUpResult: status.ScaleUpNotTried, }, { name: "some capacity is booked, succesfull ScaleUp", @@ -107,12 +109,12 @@ func TestScaleUp(t *testing.T) { clustersnapshot.InitializeClusterSnapshotOrDie(t, autoscalingContext.ClusterSnapshot, allNodes, nil) prPods, err := pods.PodsForProvisioningRequest(tc.provReqToScaleUp) assert.NoError(t, err) + client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...) orchestrator := &provReqOrchestrator{ - initialized: true, - context: &autoscalingContext, - client: provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...), - injector: scheduling.NewHintingSimulator(autoscalingContext.PredicateChecker), + client: client, + provisioningClasses: []provisioningClass{checkcapacity.New(client)}, } + orchestrator.Initialize(&autoscalingContext, nil, nil, nil, taints.TaintConfig{}) st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*v1.DaemonSet{}, map[string]*framework.NodeInfo{}) if !tc.err { assert.NoError(t, err) diff --git a/cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator.go b/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator.go similarity index 83% rename from cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator.go rename to cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator.go index 01991b5da2e8..cda3bc297d70 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator.go @@ -17,21 +17,18 @@ limitations under the License. package orchestrator import ( - "fmt" - appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/scaleup" + "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" "k8s.io/autoscaler/cluster-autoscaler/estimator" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/provreq" "k8s.io/autoscaler/cluster-autoscaler/processors/status" - "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" - "k8s.io/client-go/rest" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -41,20 +38,16 @@ import ( type WrapperOrchestrator struct { // scaleUpRegularPods indicates that ScaleUp for regular pods will be run in the current CA loop, if they are present. scaleUpRegularPods bool - scaleUpOrchestrator scaleup.Orchestrator + podsOrchestrator scaleup.Orchestrator provReqOrchestrator scaleup.Orchestrator } // NewWrapperOrchestrator return WrapperOrchestrator -func NewWrapperOrchestrator(kubeConfig *rest.Config) (scaleup.Orchestrator, error) { - provReqOrchestrator, err := checkcapacity.New(kubeConfig) - if err != nil { - return nil, fmt.Errorf("failed create ScaleUp orchestrator for ProvisioningRequests, error: %v", err) - } +func NewWrapperOrchestrator(provReqOrchestrator scaleup.Orchestrator) *WrapperOrchestrator { return &WrapperOrchestrator{ - scaleUpOrchestrator: New(), + podsOrchestrator: orchestrator.New(), provReqOrchestrator: provReqOrchestrator, - }, nil + } } // Initialize initializes the orchestrator object with required fields. @@ -65,7 +58,7 @@ func (o *WrapperOrchestrator) Initialize( estimatorBuilder estimator.EstimatorBuilder, taintConfig taints.TaintConfig, ) { - o.scaleUpOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, estimatorBuilder, taintConfig) + o.podsOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, estimatorBuilder, taintConfig) o.provReqOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, estimatorBuilder, taintConfig) } @@ -86,7 +79,7 @@ func (o *WrapperOrchestrator) ScaleUp( } if o.scaleUpRegularPods { - return o.scaleUpOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos) + return o.podsOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos) } return o.provReqOrchestrator.ScaleUp(provReqPods, nodes, daemonSets, nodeInfos) } @@ -110,5 +103,5 @@ func (o *WrapperOrchestrator) ScaleUpToNodeGroupMinSize( nodes []*apiv1.Node, nodeInfos map[string]*schedulerframework.NodeInfo, ) (*status.ScaleUpStatus, errors.AutoscalerError) { - return o.scaleUpOrchestrator.ScaleUpToNodeGroupMinSize(nodes, nodeInfos) + return o.podsOrchestrator.ScaleUpToNodeGroupMinSize(nodes, nodeInfos) } diff --git a/cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator_test.go b/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator_test.go similarity index 96% rename from cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator_test.go rename to cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator_test.go index 521b1f362599..1e28b8f10779 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator_test.go @@ -39,10 +39,10 @@ const ( regularPodsErrorMsg = "regularPodsError" ) -func TestScaleUp(t *testing.T) { +func TestWrapperScaleUp(t *testing.T) { o := WrapperOrchestrator{ provReqOrchestrator: &fakeScaleUp{provisioningRequestErrorMsg}, - scaleUpOrchestrator: &fakeScaleUp{regularPodsErrorMsg}, + podsOrchestrator: &fakeScaleUp{regularPodsErrorMsg}, } regularPods := []*apiv1.Pod{ BuildTestPod("pod-1", 1, 100), diff --git a/cluster-autoscaler/provisioningrequest/provreqclient/client.go b/cluster-autoscaler/provisioningrequest/provreqclient/client.go index 7bf75d6e3eff..9d135bb3f60f 100644 --- a/cluster-autoscaler/provisioningrequest/provreqclient/client.go +++ b/cluster-autoscaler/provisioningrequest/provreqclient/client.go @@ -157,3 +157,29 @@ func newPodTemplatesLister(client *kubernetes.Clientset, stopChannel <-chan stru klog.V(2).Info("Successful initial Pod Template sync") return podTemplLister, nil } + +// ProvisioningRequestForPods check that all pods belong to one ProvisioningRequest and return it. +func ProvisioningRequestForPods(client *ProvisioningRequestClient, unschedulablePods []*apiv1.Pod) (*provreqwrapper.ProvisioningRequest, error) { + if len(unschedulablePods) == 0 { + return nil, fmt.Errorf("empty unschedulablePods list") + } + if unschedulablePods[0].OwnerReferences == nil || len(unschedulablePods[0].OwnerReferences) == 0 { + return nil, fmt.Errorf("pod %s has no OwnerReference", unschedulablePods[0].Name) + } + provReq, err := client.ProvisioningRequest(unschedulablePods[0].Namespace, unschedulablePods[0].OwnerReferences[0].Name) + if err != nil { + return nil, fmt.Errorf("failed retrive ProvisioningRequest from unscheduled pods, err: %v", err) + } + for _, pod := range unschedulablePods { + if pod.Namespace != unschedulablePods[0].Namespace { + return nil, fmt.Errorf("pods %s and %s are from different namespaces", pod.Name, unschedulablePods[0].Name) + } + if pod.OwnerReferences == nil || len(pod.OwnerReferences) == 0 { + return nil, fmt.Errorf("pod %s has no OwnerReference", pod.Name) + } + if pod.OwnerReferences[0].Name != unschedulablePods[0].OwnerReferences[0].Name { + return nil, fmt.Errorf("pods %s and %s have different OwnerReference", pod.Name, unschedulablePods[0].Name) + } + } + return provReq, nil +} diff --git a/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go b/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go index 5f5acc057d4c..15f9e10c3928 100644 --- a/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go +++ b/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go @@ -19,9 +19,15 @@ package provreqclient import ( "context" "testing" + "time" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" ) func TestFetchPodTemplates(t *testing.T) { @@ -42,3 +48,64 @@ func TestFetchPodTemplates(t *testing.T) { t.Errorf("Template mismatch, diff (-want +got):\n%s", diff) } } + +func TestProvisioningRequestForPods(t *testing.T) { + checkCapacityProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "check-capacity", "1m", "100", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity) + customProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "custom", "1m", "100", "", int32(100), false, time.Now(), "custom") + checkCapacityPods, _ := pods.PodsForProvisioningRequest(checkCapacityProvReq) + customProvReqPods, _ := pods.PodsForProvisioningRequest(customProvReq) + regularPod := BuildTestPod("p1", 600, 100) + client := NewFakeProvisioningRequestClient(context.Background(), t, checkCapacityProvReq, customProvReq) + testCases := []struct { + name string + pods []*apiv1.Pod + className string + err bool + pr *provreqwrapper.ProvisioningRequest + }{ + { + name: "no pods", + pods: []*apiv1.Pod{}, + className: "some-class", + err: true, + }, + { + name: "pods from one Provisioning Class", + pods: checkCapacityPods, + className: v1beta1.ProvisioningClassCheckCapacity, + pr: checkCapacityProvReq, + }, + { + name: "pods from different Provisioning Classes", + pods: append(checkCapacityPods, customProvReqPods...), + className: v1beta1.ProvisioningClassCheckCapacity, + err: true, + }, + { + name: "regular pod", + pods: []*apiv1.Pod{regularPod}, + className: v1beta1.ProvisioningClassCheckCapacity, + err: true, + }, + { + name: "provreq pods and regular pod", + pods: append(checkCapacityPods, regularPod), + className: v1beta1.ProvisioningClassCheckCapacity, + err: true, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + pr, err := ProvisioningRequestForPods(client, tc.pods) + if tc.err { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, pr, tc.pr) + assert.Equal(t, pr.Spec.ProvisioningClassName, tc.className) + } + }) + } +} diff --git a/cluster-autoscaler/utils/errors/errors.go b/cluster-autoscaler/utils/errors/errors.go index 2276e53f3ee9..58066e8ed939 100644 --- a/cluster-autoscaler/utils/errors/errors.go +++ b/cluster-autoscaler/utils/errors/errors.go @@ -81,6 +81,9 @@ func ToAutoscalerError(defaultType AutoscalerErrorType, err error) AutoscalerErr if e, ok := err.(AutoscalerError); ok { return e } + if err == nil { + return nil + } return NewAutoscalerError(defaultType, "%v", err) }