Skip to content

Commit

Permalink
Change garbage collector behaviour, by only removing
Browse files Browse the repository at this point in the history
AggregateCollectionsStates for which corresponding owner controller
doesn't exist anymore.
  • Loading branch information
piotrnosek committed Dec 21, 2021
1 parent a8228b3 commit d03624b
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 46 deletions.
76 changes: 59 additions & 17 deletions vertical-pod-autoscaler/pkg/recommender/model/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
apiv1 "k8s.io/api/core/v1"
labels "k8s.io/apimachinery/pkg/labels"
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher"
vpa_utils "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/vpa"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -348,15 +349,22 @@ func (cluster *ClusterState) findOrCreateAggregateContainerState(containerID Con
return aggregateContainerState
}

func (cluster *ClusterState) garbageCollectAggregateCollectionStates(now time.Time) {
// garbageCollectAggregateCollectionStates removes obsolete AggregateCollectionStates from the ClusterState.
// AggregateCollectionState is obsolete in following situations:
// 1) It has no samples and there are no more contributive pods - a pod is contributive in any of following situations:
// a) It is in an active state - i.e. not PodSucceeded nor PodFailed.
// b) Its associated controller (e.g. Deployment) still exists.
// 2) The last sample is too old to give meaningful recommendation (>8 days),
// 3) There are no samples and the aggregate state was created >8 days ago.
func (cluster *ClusterState) garbageCollectAggregateCollectionStates(now time.Time, controllerFetcher controllerfetcher.ControllerFetcher) {
klog.V(1).Info("Garbage collection of AggregateCollectionStates triggered")
keysToDelete := make([]AggregateStateKey, 0)
activeKeys := cluster.getActiveAggregateStateKeys()
contributiveKeys := cluster.getContributiveAggregateStateKeys(controllerFetcher)
for key, aggregateContainerState := range cluster.aggregateStateMap {
isKeyActive := activeKeys[key]
if !isKeyActive && aggregateContainerState.isEmpty() {
isKeyContributive := contributiveKeys[key]
if !isKeyContributive && aggregateContainerState.isEmpty() {
keysToDelete = append(keysToDelete, key)
klog.V(1).Infof("Removing empty and inactive AggregateCollectionState for %+v", key)
klog.V(1).Infof("Removing empty and not contributive AggregateCollectionState for %+v", key)
continue
}
if aggregateContainerState.isExpired(now) {
Expand All @@ -375,29 +383,34 @@ func (cluster *ClusterState) garbageCollectAggregateCollectionStates(now time.Ti
// RateLimitedGarbageCollectAggregateCollectionStates removes obsolete AggregateCollectionStates from the ClusterState.
// It performs clean up only if more than `gcInterval` passed since the last time it performed a clean up.
// AggregateCollectionState is obsolete in following situations:
// 1) It has no samples and there are no more active pods that can contribute,
// 1) It has no samples and there are no more contributive pods - a pod is contributive in any of following situations:
// a) It is in an active state - i.e. not PodSucceeded nor PodFailed.
// b) Its associated controller (e.g. Deployment) still exists.
// 2) The last sample is too old to give meaningful recommendation (>8 days),
// 3) There are no samples and the aggregate state was created >8 days ago.
func (cluster *ClusterState) RateLimitedGarbageCollectAggregateCollectionStates(now time.Time) {
func (cluster *ClusterState) RateLimitedGarbageCollectAggregateCollectionStates(now time.Time, controllerFetcher controllerfetcher.ControllerFetcher) {
if now.Sub(cluster.lastAggregateContainerStateGC) < cluster.gcInterval {
return
}
cluster.garbageCollectAggregateCollectionStates(now)
cluster.garbageCollectAggregateCollectionStates(now, controllerFetcher)
cluster.lastAggregateContainerStateGC = now
}

func (cluster *ClusterState) getActiveAggregateStateKeys() map[AggregateStateKey]bool {
activeKeys := map[AggregateStateKey]bool{}
func (cluster *ClusterState) getContributiveAggregateStateKeys(controllerFetcher controllerfetcher.ControllerFetcher) map[AggregateStateKey]bool {
contributiveKeys := map[AggregateStateKey]bool{}
for _, pod := range cluster.Pods {
// Pods that will not run anymore are considered inactive.
if pod.Phase == apiv1.PodSucceeded || pod.Phase == apiv1.PodFailed {
continue
}
for container := range pod.Containers {
activeKeys[cluster.MakeAggregateStateKey(pod, container)] = true
// Pod is considered contributive in any of following situations:
// 1) It is in active state - i.e. not PodSucceeded nor PodFailed.
// 2) Its associated controller (e.g. Deployment) still exists.
podControllerExists := cluster.GetControllerForPodUnderVPA(pod, controllerFetcher) != nil
podActive := pod.Phase != apiv1.PodSucceeded && pod.Phase != apiv1.PodFailed
if podActive || podControllerExists {
for container := range pod.Containers {
contributiveKeys[cluster.MakeAggregateStateKey(pod, container)] = true
}
}
}
return activeKeys
return contributiveKeys
}

// RecordRecommendation marks the state of recommendation in the cluster. We
Expand Down Expand Up @@ -433,6 +446,35 @@ func (cluster *ClusterState) GetMatchingPods(vpa *Vpa) []PodID {
return matchingPods
}

// GetControllerForPodUnderVPA returns controller associated with given Pod. Returns nil if Pod is not controlled by a VPA object.
func (cluster *ClusterState) GetControllerForPodUnderVPA(pod *PodState, controllerFetcher controllerfetcher.ControllerFetcher) *controllerfetcher.ControllerKeyWithAPIVersion {
controllingVPA := cluster.GetControllingVPA(pod)
if controllingVPA != nil {
controller := &controllerfetcher.ControllerKeyWithAPIVersion{
ControllerKey: controllerfetcher.ControllerKey{
Namespace: controllingVPA.ID.Namespace,
Kind: controllingVPA.TargetRef.Kind,
Name: controllingVPA.TargetRef.Name,
},
ApiVersion: controllingVPA.TargetRef.APIVersion,
}
topLevelController, _ := controllerFetcher.FindTopMostWellKnownOrScalable(controller)
return topLevelController
}
return nil
}

// GetControllingVPA returns a VPA object controlling given Pod.
func (cluster *ClusterState) GetControllingVPA(pod *PodState) *Vpa {
for _, vpa := range cluster.Vpas {
if vpa_utils.PodLabelsMatchVPA(pod.ID.Namespace, cluster.labelSetMap[pod.labelSetKey],
vpa.ID.Namespace, vpa.PodSelector) {
return vpa
}
}
return nil
}

// Implementation of the AggregateStateKey interface. It can be used as a map key.
type aggregateStateKey struct {
namespace string
Expand Down
Loading

0 comments on commit d03624b

Please sign in to comment.