Skip to content

Commit

Permalink
Add ProvisioningRequestProcessor (#6488)
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk authored Feb 14, 2024
1 parent 947cd3f commit 5286b3f
Show file tree
Hide file tree
Showing 18 changed files with 454 additions and 81 deletions.
6 changes: 6 additions & 0 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/expander/factory"
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
Expand All @@ -53,6 +54,7 @@ type AutoscalerOptions struct {
ExpanderStrategy expander.Strategy
EstimatorBuilder estimator.EstimatorBuilder
Processors *ca_processors.AutoscalingProcessors
LoopStartNotifier *loopstart.ObserversList
Backoff backoff.Backoff
DebuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
RemainingPdbTracker pdb.RemainingPdbTracker
Expand Down Expand Up @@ -84,6 +86,7 @@ func NewAutoscaler(opts AutoscalerOptions, informerFactory informers.SharedInfor
opts.ClusterSnapshot,
opts.AutoscalingKubeClients,
opts.Processors,
opts.LoopStartNotifier,
opts.CloudProvider,
opts.ExpanderStrategy,
opts.EstimatorBuilder,
Expand All @@ -101,6 +104,9 @@ func initializeDefaultOptions(opts *AutoscalerOptions, informerFactory informers
if opts.Processors == nil {
opts.Processors = ca_processors.DefaultProcessors(opts.AutoscalingOptions)
}
if opts.LoopStartNotifier == nil {
opts.LoopStartNotifier = loopstart.NewObserversList(nil)
}
if opts.AutoscalingKubeClients == nil {
opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory)
}
Expand Down
5 changes: 5 additions & 0 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
Expand Down Expand Up @@ -90,6 +91,7 @@ type StaticAutoscaler struct {
scaleDownActuator scaledown.Actuator
scaleUpOrchestrator scaleup.Orchestrator
processors *ca_processors.AutoscalingProcessors
loopStartNotifier *loopstart.ObserversList
processorCallbacks *staticAutoscalerProcessorCallbacks
initialized bool
taintConfig taints.TaintConfig
Expand Down Expand Up @@ -136,6 +138,7 @@ func NewStaticAutoscaler(
clusterSnapshot clustersnapshot.ClusterSnapshot,
autoscalingKubeClients *context.AutoscalingKubeClients,
processors *ca_processors.AutoscalingProcessors,
loopStartNotifier *loopstart.ObserversList,
cloudProvider cloudprovider.CloudProvider,
expanderStrategy expander.Strategy,
estimatorBuilder estimator.EstimatorBuilder,
Expand Down Expand Up @@ -205,6 +208,7 @@ func NewStaticAutoscaler(
scaleDownActuator: scaleDownActuator,
scaleUpOrchestrator: scaleUpOrchestrator,
processors: processors,
loopStartNotifier: loopStartNotifier,
processorCallbacks: processorCallbacks,
clusterStateRegistry: clusterStateRegistry,
taintConfig: taintConfig,
Expand Down Expand Up @@ -337,6 +341,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
klog.Errorf("Failed to refresh cloud provider config: %v", err)
return caerrors.ToAutoscalerError(caerrors.CloudProviderError, err)
}
a.loopStartNotifier.Refresh()

// Update node groups min/max and maximum number of nodes being set for all node groups after cloud provider refresh
maxNodesCount := 0
Expand Down
11 changes: 11 additions & 0 deletions cluster-autoscaler/core/static_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/callbacks"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
Expand Down Expand Up @@ -281,6 +282,7 @@ func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) {
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}

Expand Down Expand Up @@ -374,6 +376,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
initialized: true,
}
Expand Down Expand Up @@ -573,6 +576,7 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) {
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
initialized: true,
}
Expand Down Expand Up @@ -798,6 +802,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
initialized: true,
}
Expand Down Expand Up @@ -948,6 +953,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}

Expand Down Expand Up @@ -1096,6 +1102,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}

Expand Down Expand Up @@ -1224,6 +1231,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T)
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}

Expand Down Expand Up @@ -1322,6 +1330,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t *
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}

Expand Down Expand Up @@ -1427,6 +1436,7 @@ func TestStaticAutoscalerRunOnceWithUnselectedNodeGroups(t *testing.T) {
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
processors: NewTestProcessors(&context),
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}

Expand Down Expand Up @@ -2023,6 +2033,7 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) {
scaleDownActuator: actuator,
scaleDownPlanner: planner,
processors: NewTestProcessors(&ctx),
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}

Expand Down
26 changes: 16 additions & 10 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"

Expand All @@ -49,6 +50,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
Expand Down Expand Up @@ -469,15 +471,6 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
drainabilityRules := rules.Default(deleteOptions)

scaleUpOrchestrator := orchestrator.New()
if *provisioningRequestsEnabled {
kubeClient := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
scaleUpOrchestrator, err = orchestrator.NewWrapperOrchestrator(kubeClient)
if err != nil {
return nil, err
}
}

opts := core.AutoscalerOptions{
AutoscalingOptions: autoscalingOptions,
ClusterSnapshot: clustersnapshot.NewDeltaClusterSnapshot(),
Expand All @@ -487,14 +480,27 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
PredicateChecker: predicateChecker,
DeleteOptions: deleteOptions,
DrainabilityRules: drainabilityRules,
ScaleUpOrchestrator: scaleUpOrchestrator,
ScaleUpOrchestrator: orchestrator.New(),
}

opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions)
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets)
podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker)

if autoscalingOptions.ProvisioningRequestEnabled {
podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager()))

restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
scaleUpOrchestrator, err := orchestrator.NewWrapperOrchestrator(restConfig)
if err != nil {
return nil, err
}
opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor, err := provreq.NewCombinedProvReqProcessor(restConfig, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor()})
if err != nil {
return nil, err
}
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
}
opts.Processors.PodListProcessor = podListProcessor
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}
Expand Down
40 changes: 40 additions & 0 deletions cluster-autoscaler/observers/loopstart/loopstart.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package loopstart

// Observer interface is used to store object that needed to be refreshed in each CA loop.
// It returns error and a bool value whether the loop should be skipped.
type Observer interface {
Refresh()
}

// ObserversList interface is used to store objects that needed to be refreshed in each CA loop.
type ObserversList struct {
observers []Observer
}

// Refresh refresh observers each CA loop.
func (l *ObserversList) Refresh() {
for _, observer := range l.observers {
observer.Refresh()
}
}

// NewObserversList return new ObserversList.
func NewObserversList(observers []Observer) *ObserversList {
return &ObserversList{observers}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provreq

import (
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
)

// ProvisioningRequestProcessor process ProvisioningRequests in the cluster.
type ProvisioningRequestProcessor interface {
Process([]*provreqwrapper.ProvisioningRequest)
CleanUp()
}

type provisioningRequestClient interface {
ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error)
ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error)
}

// CombinedProvReqProcessor is responsible for processing ProvisioningRequest for each ProvisioningClass
// every CA loop and updating conditions for expired ProvisioningRequests.
type CombinedProvReqProcessor struct {
client provisioningRequestClient
processors []ProvisioningRequestProcessor
}

// NewCombinedProvReqProcessor return new CombinedProvReqProcessor.
func NewCombinedProvReqProcessor(kubeConfig *rest.Config, processors []ProvisioningRequestProcessor) (loopstart.Observer, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
}
return &CombinedProvReqProcessor{client: client, processors: processors}, nil
}

// Refresh iterates over ProvisioningRequests and updates its conditions/state.
func (cp *CombinedProvReqProcessor) Refresh() {
provReqs, err := cp.client.ProvisioningRequests()
if err != nil {
klog.Errorf("Failed to get ProvisioningRequests list, err: %v", err)
return
}
for _, p := range cp.processors {
p.Process(provReqs)
}
}

// CleanUp cleans up internal state
func (cp *CombinedProvReqProcessor) CleanUp() {}
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,8 @@ type Detail string
// The following constants list all currently available Conditions Type values.
// See: https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Condition
const (
// CapacityFound indicates that all of the requested resources were
// fount in the cluster.
CapacityFound string = "CapacityFound"
// Expired indicates that the ProvisioningRequest had CapacityFound condition before
// and the reservation time is expired.
// BookingExpired indicates that the ProvisioningRequest had Provisioned condition before
// and capacity reservation time is expired.
BookingExpired string = "BookingExpired"
// Provisioned indicates that all of the requested resources were created
// and are available in the cluster. CA will set this condition when the
Expand Down
Loading

0 comments on commit 5286b3f

Please sign in to comment.