Skip to content

Commit

Permalink
Change garbage collector behaviour, by only removing
Browse files Browse the repository at this point in the history
AggregateCollectionsStates for which corresponding owner controller
doesn't exist anymore.
  • Loading branch information
piotrnosek committed Dec 2, 2021
1 parent a8228b3 commit 9c54eef
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 46 deletions.
72 changes: 55 additions & 17 deletions vertical-pod-autoscaler/pkg/recommender/model/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
apiv1 "k8s.io/api/core/v1"
labels "k8s.io/apimachinery/pkg/labels"
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher"
vpa_utils "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/vpa"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -348,15 +349,20 @@ func (cluster *ClusterState) findOrCreateAggregateContainerState(containerID Con
return aggregateContainerState
}

func (cluster *ClusterState) garbageCollectAggregateCollectionStates(now time.Time) {
// GarbageCollectAggregateCollectionStates removes obsolete AggregateCollectionStates from the ClusterState.
// AggregateCollectionState is obsolete in following situations:
// 1) It has no samples and there are no more contributive pods,
// 2) The last sample is too old to give meaningful recommendation (>8 days),
// 3) There are no samples and the aggregate state was created >8 days ago.
func (cluster *ClusterState) garbageCollectAggregateCollectionStates(now time.Time, controllerFetcher controllerfetcher.ControllerFetcher) {
klog.V(1).Info("Garbage collection of AggregateCollectionStates triggered")
keysToDelete := make([]AggregateStateKey, 0)
activeKeys := cluster.getActiveAggregateStateKeys()
contributiveKeys := cluster.getContributiveAggregateStateKeys(controllerFetcher)
for key, aggregateContainerState := range cluster.aggregateStateMap {
isKeyActive := activeKeys[key]
if !isKeyActive && aggregateContainerState.isEmpty() {
isKeyContributive := contributiveKeys[key]
if !isKeyContributive && aggregateContainerState.isEmpty() {
keysToDelete = append(keysToDelete, key)
klog.V(1).Infof("Removing empty and inactive AggregateCollectionState for %+v", key)
klog.V(1).Infof("Removing empty and not contributive AggregateCollectionState for %+v", key)
continue
}
if aggregateContainerState.isExpired(now) {
Expand All @@ -375,29 +381,32 @@ func (cluster *ClusterState) garbageCollectAggregateCollectionStates(now time.Ti
// RateLimitedGarbageCollectAggregateCollectionStates removes obsolete AggregateCollectionStates from the ClusterState.
// It performs clean up only if more than `gcInterval` passed since the last time it performed a clean up.
// AggregateCollectionState is obsolete in following situations:
// 1) It has no samples and there are no more active pods that can contribute,
// 1) It has no samples and there are no more contributive pods that can contribute,
// 2) The last sample is too old to give meaningful recommendation (>8 days),
// 3) There are no samples and the aggregate state was created >8 days ago.
func (cluster *ClusterState) RateLimitedGarbageCollectAggregateCollectionStates(now time.Time) {
func (cluster *ClusterState) RateLimitedGarbageCollectAggregateCollectionStates(now time.Time, controllerFetcher controllerfetcher.ControllerFetcher) {
if now.Sub(cluster.lastAggregateContainerStateGC) < cluster.gcInterval {
return
}
cluster.garbageCollectAggregateCollectionStates(now)
cluster.garbageCollectAggregateCollectionStates(now, controllerFetcher)
cluster.lastAggregateContainerStateGC = now
}

func (cluster *ClusterState) getActiveAggregateStateKeys() map[AggregateStateKey]bool {
activeKeys := map[AggregateStateKey]bool{}
func (cluster *ClusterState) getContributiveAggregateStateKeys(controllerFetcher controllerfetcher.ControllerFetcher) map[AggregateStateKey]bool {
contributiveKeys := map[AggregateStateKey]bool{}
for _, pod := range cluster.Pods {
// Pods that will not run anymore are considered inactive.
if pod.Phase == apiv1.PodSucceeded || pod.Phase == apiv1.PodFailed {
continue
}
for container := range pod.Containers {
activeKeys[cluster.MakeAggregateStateKey(pod, container)] = true
// Pod is considered contributive in any of following situations:
// 1) It is in active state - i.e. not PodSucceeded nor PodFailed.
// 2) Its associated controller (e.g. Deploymeny) still exists.
podControllerExists := cluster.GetControllerForPodUnderVPA(pod, controllerFetcher) != nil
podActive := pod.Phase != apiv1.PodSucceeded && pod.Phase != apiv1.PodFailed
if podActive || podControllerExists {
for container := range pod.Containers {
contributiveKeys[cluster.MakeAggregateStateKey(pod, container)] = true
}
}
}
return activeKeys
return contributiveKeys
}

// RecordRecommendation marks the state of recommendation in the cluster. We
Expand Down Expand Up @@ -433,6 +442,35 @@ func (cluster *ClusterState) GetMatchingPods(vpa *Vpa) []PodID {
return matchingPods
}

// GetControllerForPodUnderVPA returns controller associated with given Pod. Returns nil if Pod is not controlled by a VPA object.
func (cluster *ClusterState) GetControllerForPodUnderVPA(pod *PodState, controllerFetcher controllerfetcher.ControllerFetcher) *controllerfetcher.ControllerKeyWithAPIVersion {
controllingVPA := cluster.GetControllingVPA(pod)
if controllingVPA != nil {
controller := &controllerfetcher.ControllerKeyWithAPIVersion{
ControllerKey: controllerfetcher.ControllerKey{
Namespace: controllingVPA.ID.Namespace,
Kind: controllingVPA.TargetRef.Kind,
Name: controllingVPA.TargetRef.Name,
},
ApiVersion: controllingVPA.TargetRef.APIVersion,
}
topLevelController, _ := controllerFetcher.FindTopMostWellKnownOrScalable(controller)
return topLevelController
}
return nil
}

// GetControllingVPA returns a VPA object controlling given Pod.
func (cluster *ClusterState) GetControllingVPA(pod *PodState) *Vpa {
for _, vpa := range cluster.Vpas {
if vpa_utils.PodLabelsMatchVPA(pod.ID.Namespace, cluster.labelSetMap[pod.labelSetKey],
vpa.ID.Namespace, vpa.PodSelector) {
return vpa
}
}
return nil
}

// Implementation of the AggregateStateKey interface. It can be used as a map key.
type aggregateStateKey struct {
namespace string
Expand Down
113 changes: 87 additions & 26 deletions vertical-pod-autoscaler/pkg/recommender/model/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"time"

"github.com/stretchr/testify/assert"
autoscaling "k8s.io/api/autoscaling/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
labels "k8s.io/apimachinery/pkg/labels"
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/test"
"k8s.io/klog"
)
Expand All @@ -40,8 +42,34 @@ var (
testLabels = map[string]string{"label-1": "value-1"}
emptyLabels = map[string]string{}
testSelectorStr = "label-1 = value-1"
testTargetRef = &autoscaling.CrossVersionObjectReference{
Kind: "kind-1",
Name: "name-1",
APIVersion: "apiVersion-1",
}
testControllerKey = &controllerfetcher.ControllerKeyWithAPIVersion{
ControllerKey: controllerfetcher.ControllerKey{
Kind: "kind-1",
Name: "name-1",
Namespace: "namespace-1",
},
ApiVersion: "apiVersion-1",
}
testControllerFetcher = &fakeControllerFetcher{
key: testControllerKey,
err: nil,
}
)

type fakeControllerFetcher struct {
key *controllerfetcher.ControllerKeyWithAPIVersion
err error
}

func (f *fakeControllerFetcher) FindTopMostWellKnownOrScalable(controller *controllerfetcher.ControllerKeyWithAPIVersion) (*controllerfetcher.ControllerKeyWithAPIVersion, error) {
return f.key, f.err
}

const testGcPeriod = time.Minute

func makeTestUsageSample() *ContainerUsageSampleWithKey {
Expand Down Expand Up @@ -83,7 +111,7 @@ func TestClusterGCAggregateContainerStateDeletesOld(t *testing.T) {
assert.NotEmpty(t, vpa.aggregateContainerStates)

// AggegateContainerState are valid for 8 days since last sample
cluster.garbageCollectAggregateCollectionStates(usageSample.MeasureStart.Add(9 * 24 * time.Hour))
cluster.garbageCollectAggregateCollectionStates(usageSample.MeasureStart.Add(9*24*time.Hour), testControllerFetcher)

// AggegateContainerState should be deleted from both cluster and vpa
assert.Empty(t, cluster.aggregateStateMap)
Expand All @@ -109,45 +137,78 @@ func TestClusterGCAggregateContainerStateDeletesOldEmpty(t *testing.T) {
}

// Verify empty aggregate states are not removed right away.
cluster.garbageCollectAggregateCollectionStates(creationTime.Add(1 * time.Minute)) // AggegateContainerState should be deleted from both cluster and vpa
cluster.garbageCollectAggregateCollectionStates(creationTime.Add(1*time.Minute), testControllerFetcher) // AggegateContainerState should be deleted from both cluster and vpa
assert.NotEmpty(t, cluster.aggregateStateMap)
assert.NotEmpty(t, vpa.aggregateContainerStates)

// AggegateContainerState are valid for 8 days since creation
cluster.garbageCollectAggregateCollectionStates(creationTime.Add(9 * 24 * time.Hour))
cluster.garbageCollectAggregateCollectionStates(creationTime.Add(9*24*time.Hour), testControllerFetcher)

// AggegateContainerState should be deleted from both cluster and vpa
assert.Empty(t, cluster.aggregateStateMap)
assert.Empty(t, vpa.aggregateContainerStates)
}

func TestClusterGCAggregateContainerStateDeletesEmptyInactive(t *testing.T) {
func TestClusterGCAggregateContainerStateDeletesEmptyInactiveWithoutController(t *testing.T) {
// Create a pod with a single container.
cluster := NewClusterState(testGcPeriod)
vpa := addTestVpa(cluster)
pod := addTestPod(cluster)
// Controller Fetcher returns nil, meaning that there is no corresponding controller alive.
controller := &fakeControllerFetcher{
key: nil,
err: nil,
}

assert.NoError(t, cluster.AddOrUpdateContainer(testContainerID, testRequest))
// No usage samples added.

assert.NotEmpty(t, cluster.aggregateStateMap)
assert.NotEmpty(t, vpa.aggregateContainerStates)

cluster.garbageCollectAggregateCollectionStates(testTimestamp)
cluster.garbageCollectAggregateCollectionStates(testTimestamp, controller)

// AggegateContainerState should not be deleted as the pod is still active.
assert.NotEmpty(t, cluster.aggregateStateMap)
assert.NotEmpty(t, vpa.aggregateContainerStates)

cluster.Pods[pod.ID].Phase = apiv1.PodSucceeded
cluster.garbageCollectAggregateCollectionStates(testTimestamp)
cluster.garbageCollectAggregateCollectionStates(testTimestamp, controller)

// AggegateContainerState should be empty as the pod is no longer active and
// there are no usage samples.
// AggegateContainerState should be empty as the pod is no longer active, controller is not alive
// and there are no usage samples.
assert.Empty(t, cluster.aggregateStateMap)
assert.Empty(t, vpa.aggregateContainerStates)
}

func TestClusterGCAggregateContainerStateLeavesEmptyInactiveWithController(t *testing.T) {
// Create a pod with a single container.
cluster := NewClusterState(testGcPeriod)
vpa := addTestVpa(cluster)
pod := addTestPod(cluster)
// Controller Fetcher returns existing controller, meaning that there is a corresponding controller alive.
controller := testControllerFetcher

assert.NoError(t, cluster.AddOrUpdateContainer(testContainerID, testRequest))
// No usage samples added.

assert.NotEmpty(t, cluster.aggregateStateMap)
assert.NotEmpty(t, vpa.aggregateContainerStates)

cluster.garbageCollectAggregateCollectionStates(testTimestamp, controller)

// AggegateContainerState should not be deleted as the pod is still active.
assert.NotEmpty(t, cluster.aggregateStateMap)
assert.NotEmpty(t, vpa.aggregateContainerStates)

cluster.Pods[pod.ID].Phase = apiv1.PodSucceeded
cluster.garbageCollectAggregateCollectionStates(testTimestamp, controller)

// AggegateContainerState should not be delated as the controller is still alive.
assert.NotEmpty(t, cluster.aggregateStateMap)
assert.NotEmpty(t, vpa.aggregateContainerStates)
}

func TestClusterGCAggregateContainerStateLeavesValid(t *testing.T) {
// Create a pod with a single container.
cluster := NewClusterState(testGcPeriod)
Expand All @@ -164,7 +225,7 @@ func TestClusterGCAggregateContainerStateLeavesValid(t *testing.T) {
assert.NotEmpty(t, vpa.aggregateContainerStates)

// AggegateContainerState are valid for 8 days since last sample
cluster.garbageCollectAggregateCollectionStates(usageSample.MeasureStart.Add(7 * 24 * time.Hour))
cluster.garbageCollectAggregateCollectionStates(usageSample.MeasureStart.Add(7*24*time.Hour), testControllerFetcher)

assert.NotEmpty(t, cluster.aggregateStateMap)
assert.NotEmpty(t, vpa.aggregateContainerStates)
Expand All @@ -191,7 +252,7 @@ func TestAddSampleAfterAggregateContainerStateGCed(t *testing.T) {

// AggegateContainerState are invalid after 8 days since last sample
gcTimestamp := usageSample.MeasureStart.Add(10 * 24 * time.Hour)
cluster.garbageCollectAggregateCollectionStates(gcTimestamp)
cluster.garbageCollectAggregateCollectionStates(gcTimestamp, testControllerFetcher)

assert.Empty(t, cluster.aggregateStateMap)
assert.Empty(t, vpa.aggregateContainerStates)
Expand All @@ -216,7 +277,7 @@ func TestClusterGCRateLimiting(t *testing.T) {
sampleExpireTime := usageSample.MeasureStart.Add(9 * 24 * time.Hour)
// AggegateContainerState are valid for 8 days since last sample but this run
// doesn't remove the sample, because we didn't add it yet.
cluster.RateLimitedGarbageCollectAggregateCollectionStates(sampleExpireTime)
cluster.RateLimitedGarbageCollectAggregateCollectionStates(sampleExpireTime, testControllerFetcher)
vpa := addTestVpa(cluster)
addTestPod(cluster)
assert.NoError(t, cluster.AddOrUpdateContainer(testContainerID, testRequest))
Expand All @@ -228,12 +289,12 @@ func TestClusterGCRateLimiting(t *testing.T) {

// Sample is expired but this run doesn't remove it yet, because less than testGcPeriod
// elapsed since the previous run.
cluster.RateLimitedGarbageCollectAggregateCollectionStates(sampleExpireTime.Add(testGcPeriod / 2))
cluster.RateLimitedGarbageCollectAggregateCollectionStates(sampleExpireTime.Add(testGcPeriod/2), testControllerFetcher)
assert.NotEmpty(t, cluster.aggregateStateMap)
assert.NotEmpty(t, vpa.aggregateContainerStates)

// AggegateContainerState should be deleted from both cluster and vpa
cluster.RateLimitedGarbageCollectAggregateCollectionStates(sampleExpireTime.Add(2 * testGcPeriod))
cluster.RateLimitedGarbageCollectAggregateCollectionStates(sampleExpireTime.Add(2*testGcPeriod), testControllerFetcher)
assert.Empty(t, cluster.aggregateStateMap)
assert.Empty(t, vpa.aggregateContainerStates)
}
Expand Down Expand Up @@ -266,9 +327,9 @@ func TestMissingKeys(t *testing.T) {
assert.EqualError(t, err, "KeyError: {namespace-1 pod-1}")
}

func addVpa(cluster *ClusterState, id VpaID, annotations vpaAnnotationsMap, selector string) *Vpa {
func addVpa(cluster *ClusterState, id VpaID, annotations vpaAnnotationsMap, selector string, targetRef *autoscaling.CrossVersionObjectReference) *Vpa {
apiObject := test.VerticalPodAutoscaler().WithNamespace(id.Namespace).
WithName(id.VpaName).WithContainer(testContainerID.ContainerName).WithAnnotations(annotations).Get()
WithName(id.VpaName).WithContainer(testContainerID.ContainerName).WithAnnotations(annotations).WithTargetRef(targetRef).Get()
return addVpaObject(cluster, id, apiObject, selector)
}

Expand All @@ -283,7 +344,7 @@ func addVpaObject(cluster *ClusterState, id VpaID, vpa *vpa_types.VerticalPodAut
}

func addTestVpa(cluster *ClusterState) *Vpa {
return addVpa(cluster, testVpaID, testAnnotations, testSelectorStr)
return addVpa(cluster, testVpaID, testAnnotations, testSelectorStr, testTargetRef)
}

func addTestPod(cluster *ClusterState) *PodState {
Expand Down Expand Up @@ -343,11 +404,11 @@ func TestUpdateAnnotations(t *testing.T) {
assert.Equal(t, vpa.Annotations, testAnnotations)
// Update the annotations (non-empty).
annotations := vpaAnnotationsMap{"key-2": "value-2"}
vpa = addVpa(cluster, testVpaID, annotations, testSelectorStr)
vpa = addVpa(cluster, testVpaID, annotations, testSelectorStr, testTargetRef)
assert.Equal(t, vpa.Annotations, annotations)
// Update the annotations (empty).
annotations = vpaAnnotationsMap{}
vpa = addVpa(cluster, testVpaID, annotations, testSelectorStr)
vpa = addVpa(cluster, testVpaID, annotations, testSelectorStr, testTargetRef)
assert.Equal(t, vpa.Annotations, annotations)
}

Expand All @@ -362,15 +423,15 @@ func TestUpdatePodSelector(t *testing.T) {
addTestContainer(cluster)

// Update the VPA selector such that it still matches the Pod.
vpa = addVpa(cluster, testVpaID, testAnnotations, "label-1 in (value-1,value-2)")
vpa = addVpa(cluster, testVpaID, testAnnotations, "label-1 in (value-1,value-2)", testTargetRef)
assert.Contains(t, vpa.aggregateContainerStates, cluster.aggregateStateKeyForContainerID(testContainerID))

// Update the VPA selector to no longer match the Pod.
vpa = addVpa(cluster, testVpaID, testAnnotations, "label-1 = value-2")
vpa = addVpa(cluster, testVpaID, testAnnotations, "label-1 = value-2", testTargetRef)
assert.NotContains(t, vpa.aggregateContainerStates, cluster.aggregateStateKeyForContainerID(testContainerID))

// Update the VPA selector to match the Pod again.
vpa = addVpa(cluster, testVpaID, testAnnotations, "label-1 = value-1")
vpa = addVpa(cluster, testVpaID, testAnnotations, "label-1 = value-1", testTargetRef)
assert.Contains(t, vpa.aggregateContainerStates, cluster.aggregateStateKeyForContainerID(testContainerID))
}

Expand Down Expand Up @@ -566,7 +627,7 @@ func TestTwoPodsWithDifferentNamespaces(t *testing.T) {
func TestEmptySelector(t *testing.T) {
cluster := NewClusterState(testGcPeriod)
// Create a VPA with an empty selector (matching all pods).
vpa := addVpa(cluster, testVpaID, testAnnotations, "")
vpa := addVpa(cluster, testVpaID, testAnnotations, "", testTargetRef)
// Create a pod with labels. Add a container.
cluster.AddOrUpdatePod(testPodID, testLabels, apiv1.PodRunning)
containerID1 := ContainerID{testPodID, "foo"}
Expand Down Expand Up @@ -635,7 +696,7 @@ func TestRecordRecommendation(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
cluster := NewClusterState(testGcPeriod)
vpa := addVpa(cluster, testVpaID, testAnnotations, testSelectorStr)
vpa := addVpa(cluster, testVpaID, testAnnotations, testSelectorStr, testTargetRef)
cluster.Vpas[testVpaID].Recommendation = tc.recommendation
if !tc.lastLogged.IsZero() {
cluster.EmptyVPAs[testVpaID] = tc.lastLogged
Expand Down Expand Up @@ -718,7 +779,7 @@ func TestGetActiveMatchingPods(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
cluster := NewClusterState(testGcPeriod)
vpa := addVpa(cluster, testVpaID, testAnnotations, tc.vpaSelector)
vpa := addVpa(cluster, testVpaID, testAnnotations, tc.vpaSelector, testTargetRef)
for _, pod := range tc.pods {
cluster.AddOrUpdatePod(pod.id, pod.labels, pod.phase)
}
Expand Down Expand Up @@ -792,7 +853,7 @@ func TestVPAWithMatchingPods(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name+", VPA first", func(t *testing.T) {
cluster := NewClusterState(testGcPeriod)
vpa := addVpa(cluster, testVpaID, testAnnotations, tc.vpaSelector)
vpa := addVpa(cluster, testVpaID, testAnnotations, tc.vpaSelector, testTargetRef)
for _, podDesc := range tc.pods {
cluster.AddOrUpdatePod(podDesc.id, podDesc.labels, podDesc.phase)
containerID := ContainerID{testPodID, "foo"}
Expand All @@ -810,7 +871,7 @@ func TestVPAWithMatchingPods(t *testing.T) {
containerID := ContainerID{testPodID, "foo"}
assert.NoError(t, cluster.AddOrUpdateContainer(containerID, testRequest))
}
vpa := addVpa(cluster, testVpaID, testAnnotations, tc.vpaSelector)
vpa := addVpa(cluster, testVpaID, testAnnotations, tc.vpaSelector, testTargetRef)
assert.Equal(t, tc.expectedMatch, cluster.Vpas[vpa.ID].PodCount)
})
}
Expand Down
Loading

0 comments on commit 9c54eef

Please sign in to comment.