diff --git a/cluster-autoscaler/context/autoscaling_context.go b/cluster-autoscaler/context/autoscaling_context.go index d26c8d81bf82..0f89faaf39d5 100644 --- a/cluster-autoscaler/context/autoscaling_context.go +++ b/cluster-autoscaler/context/autoscaling_context.go @@ -20,6 +20,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/expander" processor_callbacks "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks" @@ -50,6 +51,8 @@ type AutoscalingContext struct { EstimatorBuilder estimator.EstimatorBuilder // ProcessorCallbacks is interface defining extra callback methods which can be called by processors used in extension points. ProcessorCallbacks processor_callbacks.ProcessorCallbacks + // DebuggingSnapshotter is the interface for capturing the debugging snapshot + DebuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter } // AutoscalingKubeClients contains all Kubernetes API clients, @@ -93,7 +96,8 @@ func NewAutoscalingContext( cloudProvider cloudprovider.CloudProvider, expanderStrategy expander.Strategy, estimatorBuilder estimator.EstimatorBuilder, - processorCallbacks processor_callbacks.ProcessorCallbacks) *AutoscalingContext { + processorCallbacks processor_callbacks.ProcessorCallbacks, + debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) *AutoscalingContext { return &AutoscalingContext{ AutoscalingOptions: options, CloudProvider: cloudProvider, @@ -103,6 +107,7 @@ func NewAutoscalingContext( ExpanderStrategy: expanderStrategy, EstimatorBuilder: estimatorBuilder, ProcessorCallbacks: processorCallbacks, + DebuggingSnapshotter: debuggingSnapshotter, } } diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index 64c7ef74e190..6fed440793e9 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -25,6 +25,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/expander/factory" @@ -48,6 +49,7 @@ type AutoscalerOptions struct { EstimatorBuilder estimator.EstimatorBuilder Processors *ca_processors.AutoscalingProcessors Backoff backoff.Backoff + DebuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter } // Autoscaler is the main component of CA which scales up/down node groups according to its configuration @@ -76,7 +78,8 @@ func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError) opts.CloudProvider, opts.ExpanderStrategy, opts.EstimatorBuilder, - opts.Backoff), nil + opts.Backoff, + opts.DebuggingSnapshotter), nil } // Initialize default options if not provided. diff --git a/cluster-autoscaler/core/filter_out_schedulable.go b/cluster-autoscaler/core/filter_out_schedulable.go index 1b0bc6caf403..b48b8eae2be6 100644 --- a/cluster-autoscaler/core/filter_out_schedulable.go +++ b/cluster-autoscaler/core/filter_out_schedulable.go @@ -78,6 +78,12 @@ func (p *filterOutSchedulablePodListProcessor) Process( if len(unschedulablePodsToHelp) != len(unschedulablePods) { klog.V(2).Info("Schedulable pods present") context.ProcessorCallbacks.DisableScaleDownForLoop() + + if context.DebuggingSnapshotter.IsDataCollectionAllowed() { + schedulablePods := findSchedulablePods(unschedulablePods, unschedulablePodsToHelp) + context.DebuggingSnapshotter.SetUnscheduledPodsCanBeScheduled(schedulablePods) + } + } else { klog.V(4).Info("No schedulable pods") } @@ -179,3 +185,17 @@ func moreImportantPod(pod1, pod2 *apiv1.Pod) bool { p2 := corev1helpers.PodPriority(pod2) return p1 > p2 } + +func findSchedulablePods(allUnschedulablePods, podsStillUnschedulable []*apiv1.Pod) []*apiv1.Pod { + podsStillUnschedulableMap := make(map[*apiv1.Pod]struct{}, len(podsStillUnschedulable)) + for _, x := range podsStillUnschedulable { + podsStillUnschedulableMap[x] = struct{}{} + } + var schedulablePods []*apiv1.Pod + for _, x := range allUnschedulablePods { + if _, found := podsStillUnschedulableMap[x]; !found { + schedulablePods = append(schedulablePods, x) + } + } + return schedulablePods +} diff --git a/cluster-autoscaler/core/scale_down_test.go b/cluster-autoscaler/core/scale_down_test.go index 5e1aace44eb3..7075d05bce55 100644 --- a/cluster-autoscaler/core/scale_down_test.go +++ b/cluster-autoscaler/core/scale_down_test.go @@ -136,7 +136,7 @@ func TestFindUnneededNodes(t *testing.T) { }, UnremovableNodeRecheckTimeout: 5 * time.Minute, } - context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil) + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil) assert.NoError(t, err) clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) @@ -252,7 +252,7 @@ func TestFindUnneededGPUNodes(t *testing.T) { }, UnremovableNodeRecheckTimeout: 5 * time.Minute, } - context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil) + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil) assert.NoError(t, err) clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) @@ -359,7 +359,7 @@ func TestFindUnneededWithPerNodeGroupThresholds(t *testing.T) { } for tn, tc := range cases { t.Run(tn, func(t *testing.T) { - context, err := NewScaleTestAutoscalingContext(globalOptions, &fake.Clientset{}, nil, provider, nil) + context, err := NewScaleTestAutoscalingContext(globalOptions, &fake.Clientset{}, nil, provider, nil, nil) assert.NoError(t, err) clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) sd := NewScaleDown(&context, NewTestProcessors(), clusterStateRegistry) @@ -434,7 +434,7 @@ func TestPodsWithPreemptionsFindUnneededNodes(t *testing.T) { ScaleDownUtilizationThreshold: 0.35, }, } - context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil) + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil) assert.NoError(t, err) clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) @@ -491,7 +491,7 @@ func TestFindUnneededMaxCandidates(t *testing.T) { ScaleDownCandidatesPoolRatio: 1, ScaleDownCandidatesPoolMinCount: 1000, } - context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil) + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil) assert.NoError(t, err) clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) @@ -566,7 +566,7 @@ func TestFindUnneededEmptyNodes(t *testing.T) { ScaleDownCandidatesPoolRatio: 1.0, ScaleDownCandidatesPoolMinCount: 1000, } - context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil) + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil) assert.NoError(t, err) clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) @@ -616,7 +616,7 @@ func TestFindUnneededNodePool(t *testing.T) { ScaleDownCandidatesPoolRatio: 0.1, ScaleDownCandidatesPoolMinCount: 10, } - context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil) + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil) assert.NoError(t, err) clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) @@ -760,7 +760,7 @@ func TestDeleteNode(t *testing.T) { // build context registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, fakeClient, registry, provider, nil) + context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, fakeClient, registry, provider, nil, nil) assert.NoError(t, err) clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) @@ -1146,7 +1146,7 @@ func TestScaleDown(t *testing.T) { assert.NoError(t, err) registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil) - context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil) + context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil) assert.NoError(t, err) nodes := []*apiv1.Node{n1, n2} @@ -1331,7 +1331,7 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) { provider.AddNode("ng1", n1) registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil) + context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil) assert.NoError(t, err) if scenario.nodeInfoSuccess { @@ -1554,7 +1554,7 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) { assert.NotNil(t, provider) registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - context, err := NewScaleTestAutoscalingContext(config.options, fakeClient, registry, provider, nil) + context, err := NewScaleTestAutoscalingContext(config.options, fakeClient, registry, provider, nil, nil) assert.NoError(t, err) clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) @@ -1643,7 +1643,7 @@ func TestNoScaleDownUnready(t *testing.T) { MaxGracefulTerminationSec: 60, } registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil) + context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil) assert.NoError(t, err) nodes := []*apiv1.Node{n1, n2} @@ -1756,7 +1756,7 @@ func TestScaleDownNoMove(t *testing.T) { assert.NoError(t, err) registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil) - context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil) + context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil) assert.NoError(t, err) nodes := []*apiv1.Node{n1, n2} @@ -2006,7 +2006,7 @@ func TestSoftTaint(t *testing.T) { assert.NoError(t, err) registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil) - context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil) + context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil) assert.NoError(t, err) clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) @@ -2127,7 +2127,7 @@ func TestSoftTaintTimeLimit(t *testing.T) { assert.NoError(t, err) registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil) - context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil) + context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil) assert.NoError(t, err) clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) diff --git a/cluster-autoscaler/core/scale_test_common.go b/cluster-autoscaler/core/scale_test_common.go index 57a4e48d5200..c3f6cac08748 100644 --- a/cluster-autoscaler/core/scale_test_common.go +++ b/cluster-autoscaler/core/scale_test_common.go @@ -21,6 +21,8 @@ import ( "reflect" "testing" + "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" testcloudprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" @@ -155,7 +157,7 @@ func NewTestProcessors() *processors.AutoscalingProcessors { func NewScaleTestAutoscalingContext( options config.AutoscalingOptions, fakeClient kube_client.Interface, listers kube_util.ListerRegistry, provider cloudprovider.CloudProvider, - processorCallbacks processor_callbacks.ProcessorCallbacks) (context.AutoscalingContext, error) { + processorCallbacks processor_callbacks.ProcessorCallbacks, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (context.AutoscalingContext, error) { // Not enough buffer space causes the test to hang without printing any logs. // This is not useful. fakeRecorder := kube_record.NewFakeRecorder(100) @@ -170,6 +172,9 @@ func NewScaleTestAutoscalingContext( if err != nil { return context.AutoscalingContext{}, err } + if debuggingSnapshotter == nil { + debuggingSnapshotter = debuggingsnapshot.NewDebuggingSnapshotter(false) + } clusterSnapshot := simulator.NewBasicClusterSnapshot() return context.AutoscalingContext{ AutoscalingOptions: options, @@ -179,12 +184,13 @@ func NewScaleTestAutoscalingContext( LogRecorder: fakeLogRecorder, ListerRegistry: listers, }, - CloudProvider: provider, - PredicateChecker: predicateChecker, - ClusterSnapshot: clusterSnapshot, - ExpanderStrategy: random.NewStrategy(), - EstimatorBuilder: estimatorBuilder, - ProcessorCallbacks: processorCallbacks, + CloudProvider: provider, + PredicateChecker: predicateChecker, + ClusterSnapshot: clusterSnapshot, + ExpanderStrategy: random.NewStrategy(), + EstimatorBuilder: estimatorBuilder, + ProcessorCallbacks: processorCallbacks, + DebuggingSnapshotter: debuggingSnapshotter, }, nil } diff --git a/cluster-autoscaler/core/scale_up_test.go b/cluster-autoscaler/core/scale_up_test.go index 1ae8a12be17d..a05388bf0bcf 100644 --- a/cluster-autoscaler/core/scale_up_test.go +++ b/cluster-autoscaler/core/scale_up_test.go @@ -518,7 +518,7 @@ func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResul assert.NotNil(t, provider) // Create context with non-random expander strategy. - context, err := NewScaleTestAutoscalingContext(config.options, &fake.Clientset{}, listers, provider, nil) + context, err := NewScaleTestAutoscalingContext(config.options, &fake.Clientset{}, listers, provider, nil, nil) assert.NoError(t, err) expander := reportingStrategy{ @@ -684,7 +684,7 @@ func TestScaleUpUnhealthy(t *testing.T) { MaxCoresTotal: config.DefaultMaxClusterCores, MaxMemoryTotal: config.DefaultMaxClusterMemory, } - context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil) + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil) assert.NoError(t, err) nodes := []*apiv1.Node{n1, n2} @@ -724,7 +724,7 @@ func TestScaleUpNoHelp(t *testing.T) { MaxCoresTotal: config.DefaultMaxClusterCores, MaxMemoryTotal: config.DefaultMaxClusterMemory, } - context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil) + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil) assert.NoError(t, err) nodes := []*apiv1.Node{n1} @@ -790,7 +790,7 @@ func TestScaleUpBalanceGroups(t *testing.T) { MaxCoresTotal: config.DefaultMaxClusterCores, MaxMemoryTotal: config.DefaultMaxClusterMemory, } - context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil) + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil) assert.NoError(t, err) nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil) @@ -851,7 +851,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) { } podLister := kube_util.NewTestPodLister([]*apiv1.Pod{}) listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) - context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil) + context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil, nil) assert.NoError(t, err) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) @@ -904,7 +904,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) { } podLister := kube_util.NewTestPodLister([]*apiv1.Pod{}) listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) - context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil) + context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil, nil) assert.NoError(t, err) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) @@ -980,7 +980,7 @@ func TestCheckScaleUpDeltaWithinLimits(t *testing.T) { func TestAuthError(t *testing.T) { metrics.RegisterAll(false) - context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, nil, nil, nil) + context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, nil, nil, nil, nil) assert.NoError(t, err) nodeGroup := &mockprovider.NodeGroup{} diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 70ead85d1654..0338062fe36a 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -23,6 +23,7 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" @@ -119,7 +120,8 @@ func NewStaticAutoscaler( cloudProvider cloudprovider.CloudProvider, expanderStrategy expander.Strategy, estimatorBuilder estimator.EstimatorBuilder, - backoff backoff.Backoff) *StaticAutoscaler { + backoff backoff.Backoff, + debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) *StaticAutoscaler { processorCallbacks := newStaticAutoscalerProcessorCallbacks() autoscalingContext := context.NewAutoscalingContext( @@ -130,7 +132,8 @@ func NewStaticAutoscaler( cloudProvider, expanderStrategy, estimatorBuilder, - processorCallbacks) + processorCallbacks, + debuggingSnapshotter) clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: opts.MaxTotalUnreadyPercentage, @@ -220,6 +223,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError a.cleanUpIfRequired() a.processorCallbacks.reset() a.clusterStateRegistry.PeriodicCleanup() + a.DebuggingSnapshotter.StartDataCollection() + defer a.DebuggingSnapshotter.Flush() unschedulablePodLister := a.UnschedulablePodLister() scheduledPodLister := a.ScheduledPodLister() @@ -409,6 +414,13 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError } } + l, err := a.ClusterSnapshot.NodeInfos().List() + if err != nil { + klog.Errorf("Unable to fetch NodeInfo List for Debugging Snapshot, %v", err) + } else { + a.AutoscalingContext.DebuggingSnapshotter.SetNodeGroupInfo(l) + } + unschedulablePodsToHelp, _ := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods) // finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable) @@ -715,6 +727,7 @@ func (a *StaticAutoscaler) filterOutYoungPods(allUnschedulablePods []*apiv1.Pod, // ExitCleanUp performs all necessary clean-ups when the autoscaler's exiting. func (a *StaticAutoscaler) ExitCleanUp() { a.processors.CleanUp() + a.DebuggingSnapshotter.Cleanup() if !a.AutoscalingContext.WriteStatusConfigMap { return diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index e51c433fd5b3..d6309590178d 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -185,7 +185,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { } processorCallbacks := newStaticAutoscalerProcessorCallbacks() - context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks) + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil) assert.NoError(t, err) listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock, @@ -376,7 +376,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { } processorCallbacks := newStaticAutoscalerProcessorCallbacks() - context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks) + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil) assert.NoError(t, err) listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock, @@ -511,7 +511,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { } processorCallbacks := newStaticAutoscalerProcessorCallbacks() - context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks) + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil) assert.NoError(t, err) listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock, @@ -658,7 +658,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { } processorCallbacks := newStaticAutoscalerProcessorCallbacks() - context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks) + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil) assert.NoError(t, err) listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock, @@ -786,7 +786,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T) } processorCallbacks := newStaticAutoscalerProcessorCallbacks() - context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks) + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil) assert.NoError(t, err) listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock, @@ -882,7 +882,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * } processorCallbacks := newStaticAutoscalerProcessorCallbacks() - context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks) + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil) assert.NoError(t, err) listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock, @@ -944,7 +944,7 @@ func TestStaticAutoscalerInstaceCreationErrors(t *testing.T) { } processorCallbacks := newStaticAutoscalerProcessorCallbacks() - context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks) + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil) assert.NoError(t, err) clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ diff --git a/cluster-autoscaler/debuggingsnapshot/debugging_snapshot.go b/cluster-autoscaler/debuggingsnapshot/debugging_snapshot.go new file mode 100644 index 000000000000..b5d91ebfe409 --- /dev/null +++ b/cluster-autoscaler/debuggingsnapshot/debugging_snapshot.go @@ -0,0 +1,150 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package debuggingsnapshot + +import ( + "encoding/json" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +// NodeInfo captures a single entity of nodeInfo. i.e. Node specs and all the pods on that node. +type NodeInfo struct { + Node *v1.Node `json:"Node"` + Pods []*framework.PodInfo `json:"Pods"` +} + +// DebuggingSnapshot is the interface used to define any debugging snapshot +// implementation, incl. any custom impl. to be used by DebuggingSnapshotter +type DebuggingSnapshot interface { + // SetNodeGroupInfo is a setter to capture all the NodeInfo + SetNodeGroupInfo([]*framework.NodeInfo) + // SetUnscheduledPodsCanBeScheduled is a setter for all pods which are unscheduled + // but they can be scheduled. i.e. pods which aren't triggering scale-up + SetUnscheduledPodsCanBeScheduled([]*v1.Pod) + // SetErrorMessage sets the error message in the snapshot + SetErrorMessage(string) + // SetEndTimestamp sets the timestamp in the snapshot, + // when all the data collection is finished + SetEndTimestamp(time.Time) + // SetStartTimestamp sets the timestamp in the snapshot, + // when all the data collection is started + SetStartTimestamp(time.Time) + // GetOutputBytes return the output state of the Snapshot with bool to specify if + // the snapshot has the error message set + GetOutputBytes() ([]byte, bool) + // Cleanup clears the internal data obj of the snapshot, readying for next request + Cleanup() +} + +// DebuggingSnapshotImpl is the struct used to collect all the data to be output. +// Please add all new output fields in this struct. This is to make the data +// encoding/decoding easier as the single object going into the decoder +type DebuggingSnapshotImpl struct { + NodeInfo []*NodeInfo `json:"NodeList"` + UnscheduledPodsCanBeScheduled []*v1.Pod `json:"UnscheduledPodsCanBeScheduled"` + Error string `json:"Error,omitempty"` + StartTimestamp time.Time `json:"StartTimestamp"` + EndTimestamp time.Time `json:"EndTimestamp"` +} + +// SetUnscheduledPodsCanBeScheduled is the setter for UnscheduledPodsCanBeScheduled +func (s *DebuggingSnapshotImpl) SetUnscheduledPodsCanBeScheduled(podList []*v1.Pod) { + if podList == nil { + return + } + + s.UnscheduledPodsCanBeScheduled = nil + for _, pod := range podList { + s.UnscheduledPodsCanBeScheduled = append(s.UnscheduledPodsCanBeScheduled, pod) + } +} + +// SetNodeGroupInfo is the setter for Node Group Info +// All filtering/prettifying of data should be done here. +func (s *DebuggingSnapshotImpl) SetNodeGroupInfo(nodeInfos []*framework.NodeInfo) { + if nodeInfos == nil { + return + } + + var NodeInfoList []*NodeInfo + + for _, n := range nodeInfos { + nClone := n.Clone() + node := nClone.Node() + + nodeInfo := &NodeInfo{ + Node: node, + Pods: nClone.Pods, + } + + NodeInfoList = append(NodeInfoList, nodeInfo) + } + s.NodeInfo = NodeInfoList +} + +// SetEndTimestamp is the setter for end timestamp +func (s *DebuggingSnapshotImpl) SetEndTimestamp(t time.Time) { + s.EndTimestamp = t +} + +// SetStartTimestamp is the setter for end timestamp +func (s *DebuggingSnapshotImpl) SetStartTimestamp(t time.Time) { + s.StartTimestamp = t +} + +// GetOutputBytes return the output state of the Snapshot with bool to specify if +// the snapshot has the error message set +func (s *DebuggingSnapshotImpl) GetOutputBytes() ([]byte, bool) { + errMsgSet := false + if s.Error != "" { + klog.Errorf("Debugging snapshot found with error message set when GetOutputBytes() is called. - ", s.Error) + errMsgSet = true + } + + klog.Infof("Debugging snapshot flush ready") + marshalOutput, err := json.Marshal(s) + + // this error captures if the snapshot couldn't be marshalled, hence we create a new object + // and return the error message + if err != nil { + klog.Errorf("Unable to json marshal the debugging snapshot: %v", err) + errorSnapshot := DebuggingSnapshotImpl{} + errorSnapshot.SetErrorMessage("Unable to marshal the snapshot, " + err.Error()) + errorSnapshot.SetEndTimestamp(s.EndTimestamp) + errorSnapshot.SetStartTimestamp(s.StartTimestamp) + errorMarshal, err1 := json.Marshal(errorSnapshot) + klog.Errorf("Unable to marshal a new Debugging Snapshot Impl, with just a error message: %v", err1) + return errorMarshal, true + } + + return marshalOutput, errMsgSet +} + +// SetErrorMessage sets the error message in the snapshot +func (s *DebuggingSnapshotImpl) SetErrorMessage(error string) { + s.Error = error +} + +// Cleanup cleans up all the data in the snapshot without changing the +// pointer reference +func (s *DebuggingSnapshotImpl) Cleanup() { + *s = DebuggingSnapshotImpl{} +} diff --git a/cluster-autoscaler/debuggingsnapshot/debugging_snapshot_test.go b/cluster-autoscaler/debuggingsnapshot/debugging_snapshot_test.go new file mode 100644 index 000000000000..7857c54bee9f --- /dev/null +++ b/cluster-autoscaler/debuggingsnapshot/debugging_snapshot_test.go @@ -0,0 +1,106 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package debuggingsnapshot + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +func TestBasicSetterWorkflow(t *testing.T) { + snapshot := &DebuggingSnapshotImpl{} + pod := []*framework.PodInfo{ + { + Pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "Pod1", + }, + Spec: v1.PodSpec{ + NodeName: "testNode", + }, + }, + }, + } + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode", + }, + } + + nodeInfo := &framework.NodeInfo{ + Pods: pod, + Requested: &framework.Resource{}, + NonZeroRequested: &framework.Resource{}, + Allocatable: &framework.Resource{}, + Generation: 0, + } + + var nodeGroups []*framework.NodeInfo + nodeGroups = append(nodeGroups, nodeInfo) + nodeGroups[0].SetNode(node) + timestamp := time.Now().In(time.UTC) + snapshot.SetNodeGroupInfo(nodeGroups) + snapshot.SetEndTimestamp(timestamp) + op, err := snapshot.GetOutputBytes() + assert.False(t, err) + + type JSONList = []interface{} + type JSONMap = map[string]interface{} + var String = "test" + + var parsed map[string]interface{} + er := json.Unmarshal(op, &parsed) + assert.NoError(t, er) + assert.IsType(t, JSONMap{}, parsed) + assert.IsType(t, JSONList{}, parsed["NodeList"]) + assert.Greater(t, len(parsed["NodeList"].([]interface{})), 0) + assert.IsType(t, JSONMap{}, parsed["NodeList"].([]interface{})[0]) + pNodeInfo := parsed["NodeList"].([]interface{})[0].(map[string]interface{}) + assert.IsType(t, JSONMap{}, pNodeInfo["Node"].(map[string]interface{})) + pNode := pNodeInfo["Node"].(map[string]interface{}) + assert.IsType(t, JSONMap{}, pNode["metadata"].(map[string]interface{})) + pNodeObjectMeta := pNode["metadata"].(map[string]interface{}) + assert.IsType(t, String, pNodeObjectMeta["name"]) + pNodeName := pNodeObjectMeta["name"].(string) + assert.Equal(t, pNodeName, "testNode") + + assert.IsType(t, JSONList{}, pNodeInfo["Pods"]) + assert.Greater(t, len(pNodeInfo["Pods"].([]interface{})), 0) + assert.IsType(t, JSONMap{}, pNodeInfo["Pods"].([]interface{})[0]) + pPodInfo := pNodeInfo["Pods"].([]interface{})[0].(map[string]interface{}) + assert.IsType(t, JSONMap{}, pPodInfo["Pod"]) + pPod := pPodInfo["Pod"].(map[string]interface{}) + assert.IsType(t, JSONMap{}, pPod["metadata"]) + pPodMeta := pPod["metadata"].(map[string]interface{}) + assert.IsType(t, String, pPodMeta["name"]) + pPodName := pPodMeta["name"].(string) + assert.Equal(t, pPodName, "Pod1") + +} + +func TestEmptyDataNoError(t *testing.T) { + snapshot := &DebuggingSnapshotImpl{} + op, err := snapshot.GetOutputBytes() + assert.False(t, err) + assert.NotNil(t, op) +} diff --git a/cluster-autoscaler/debuggingsnapshot/debugging_snapshotter.go b/cluster-autoscaler/debuggingsnapshot/debugging_snapshotter.go new file mode 100644 index 000000000000..e11d65fe2a2e --- /dev/null +++ b/cluster-autoscaler/debuggingsnapshot/debugging_snapshotter.go @@ -0,0 +1,238 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package debuggingsnapshot + +import ( + "context" + "net/http" + "sync" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +// DebuggingSnapshotterState is the type for the debugging snapshot State machine +// The states guide the workflow of the snapshot. +type DebuggingSnapshotterState int + +// DebuggingSnapshotterState help navigate the different workflows of the snapshot capture. +const ( + // SNAPSHOTTER_DISABLED is when debuggingSnapshot is disabled on the cluster and no action can be taken + SNAPSHOTTER_DISABLED DebuggingSnapshotterState = iota + 1 + // LISTENING is set when snapshotter is enabled on the cluster and is ready to listen to a + // snapshot request. Used by ResponseHandler to wait on to listen to request + LISTENING + // TRIGGER_ENABLED is set by ResponseHandler if a valid snapshot request is received + // it states that a snapshot request needs to be processed + TRIGGER_ENABLED + // START_DATA_COLLECTION is used to synchronise the collection of data. + // Since the trigger is an asynchronous process, data collection could be started mid-loop + // leading to incomplete data. So setter methods wait for START_DATA_COLLECTION before collecting data + // which is set at the start of the next loop after receiving the trigger + START_DATA_COLLECTION + // DATA_COLLECTED is set by setter func (also used by setter func for data collection) + // This is set to let Flush know that at least some data collected and there isn't + // an error State leading to no data collection + DATA_COLLECTED +) + +// DebuggingSnapshotterImpl is the impl for DebuggingSnapshotter +type DebuggingSnapshotterImpl struct { + // State captures the internal state of the snapshotter + State *DebuggingSnapshotterState + // DebuggingSnapshot is the data bean for the snapshot + DebuggingSnapshot DebuggingSnapshot + // Mutex is the synchronisation used to the methods/states in the critical section + Mutex *sync.Mutex + // Trigger is the channel on which the Response Handler waits on to know + // when there is data to be flushed back to the channel from the Snapshot + Trigger chan struct{} + // CancelRequest is the cancel function for the snapshot request. It is used to + // terminate any ongoing request when CA is shutting down + CancelRequest context.CancelFunc +} + +// DebuggingSnapshotter is the interface for debugging snapshot +type DebuggingSnapshotter interface { + + // StartDataCollection will check the State(s) and enable data + // collection for the loop if applicable + StartDataCollection() + // SetNodeGroupInfo is a setter to capture all the NodeInfo + SetNodeGroupInfo([]*framework.NodeInfo) + // SetUnscheduledPodsCanBeScheduled is a setter for all pods which are unscheduled + // but they can be scheduled. i.e. pods which aren't triggering scale-up + SetUnscheduledPodsCanBeScheduled([]*v1.Pod) + // ResponseHandler is the http response handler to manage incoming requests + ResponseHandler(http.ResponseWriter, *http.Request) + // IsDataCollectionAllowed checks the internal State of the snapshotter + // to find if data can be collected. This can be used before preprocessing + // for the snapshot + IsDataCollectionAllowed() bool + // Flush triggers the flushing of the snapshot + Flush() + // Cleanup clears the internal data beans of the snapshot, readying for next request + Cleanup() +} + +// NewDebuggingSnapshotter returns a new instance of DebuggingSnapshotter +func NewDebuggingSnapshotter(isDebuggerEnabled bool) DebuggingSnapshotter { + state := SNAPSHOTTER_DISABLED + if isDebuggerEnabled { + klog.Infof("Debugging Snapshot is enabled") + state = LISTENING + } + return &DebuggingSnapshotterImpl{ + State: &state, + Mutex: &sync.Mutex{}, + DebuggingSnapshot: &DebuggingSnapshotImpl{}, + Trigger: make(chan struct{}, 1), + } +} + +// ResponseHandler is the impl for request handler +func (d *DebuggingSnapshotterImpl) ResponseHandler(w http.ResponseWriter, r *http.Request) { + + d.Mutex.Lock() + // checks if the handler is in the correct State to accept a new snapshot request + if *d.State != LISTENING { + defer d.Mutex.Unlock() + klog.Errorf("Debugging Snapshot is currently being processed. Another snapshot can't be processed") + w.WriteHeader(http.StatusTooManyRequests) + w.Write([]byte("Another debugging snapshot request is being processed. Concurrent requests not supported")) + return + } + + ctx, cancel := context.WithCancel(r.Context()) + d.CancelRequest = cancel + + klog.Infof("Received a new snapshot, that is accepted") + // set the State to trigger enabled, to allow workflow to collect data + *d.State = TRIGGER_ENABLED + d.Mutex.Unlock() + + select { + case <-d.Trigger: + d.Mutex.Lock() + d.DebuggingSnapshot.SetEndTimestamp(time.Now().In(time.UTC)) + body, isErrorMessage := d.DebuggingSnapshot.GetOutputBytes() + if isErrorMessage { + w.WriteHeader(http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) + } + w.Write(body) + + // reset the debugging State to receive a new snapshot request + *d.State = LISTENING + d.CancelRequest = nil + d.DebuggingSnapshot.Cleanup() + + d.Mutex.Unlock() + case <-ctx.Done(): + d.Mutex.Lock() + klog.Infof("Received terminate trigger, aborting ongoing snapshot request") + w.WriteHeader(http.StatusServiceUnavailable) + + d.DebuggingSnapshot.Cleanup() + *d.State = LISTENING + d.CancelRequest = nil + select { + case <-d.Trigger: + default: + } + d.Mutex.Unlock() + } +} + +// IsDataCollectionAllowed encapsulate the check to know if data collection is currently active +// This should be used by setters and by any function that is contingent on data collection State +// before doing extra processing. +// e.g. If you want to pre-process a particular State in cloud-provider for snapshot +// you should check this func in the loop before doing that extra processing +func (d *DebuggingSnapshotterImpl) IsDataCollectionAllowed() bool { + d.Mutex.Lock() + defer d.Mutex.Unlock() + return *d.State == DATA_COLLECTED || *d.State == START_DATA_COLLECTION +} + +// StartDataCollection changes the State when the trigger has been enabled +// to start data collection. To be done at the start of the runLoop to allow for consistency +// as the trigger can be called mid-loop leading to partial data collection +func (d *DebuggingSnapshotterImpl) StartDataCollection() { + d.Mutex.Lock() + defer d.Mutex.Unlock() + if *d.State == TRIGGER_ENABLED { + *d.State = START_DATA_COLLECTION + klog.Infof("Trigger Enabled for Debugging Snapshot, starting data collection") + d.DebuggingSnapshot.SetStartTimestamp(time.Now().In(time.UTC)) + } +} + +// Flush is the impl for DebuggingSnapshotter.Flush +// It checks if any data has been collected or data collection failed +func (d *DebuggingSnapshotterImpl) Flush() { + d.Mutex.Lock() + defer d.Mutex.Unlock() + + // Case where Data Collection was started but no data was collected, needs to + // be stated as an error and reset to pre-trigger State + if *d.State == START_DATA_COLLECTION { + klog.Errorf("No data was collected for the snapshot in this loop. So no snapshot can be generated.") + d.DebuggingSnapshot.SetErrorMessage("Unable to collect any data") + d.Trigger <- struct{}{} + return + } + + if *d.State == DATA_COLLECTED { + d.Trigger <- struct{}{} + } +} + +// SetNodeGroupInfo is the setter for Node Group Info +// All filtering/prettifying of data should be done here. +func (d *DebuggingSnapshotterImpl) SetNodeGroupInfo(nodeInfos []*framework.NodeInfo) { + if !d.IsDataCollectionAllowed() { + return + } + d.Mutex.Lock() + defer d.Mutex.Unlock() + klog.Infof("NodeGroupInfo is being set for the debugging snapshot") + d.DebuggingSnapshot.SetNodeGroupInfo(nodeInfos) + *d.State = DATA_COLLECTED +} + +// SetUnscheduledPodsCanBeScheduled is the setter for UnscheduledPodsCanBeScheduled +func (d *DebuggingSnapshotterImpl) SetUnscheduledPodsCanBeScheduled(podList []*v1.Pod) { + if !d.IsDataCollectionAllowed() { + return + } + d.Mutex.Lock() + defer d.Mutex.Unlock() + klog.Infof("UnscheduledPodsCanBeScheduled is being set for the debugging snapshot") + d.DebuggingSnapshot.SetUnscheduledPodsCanBeScheduled(podList) + *d.State = DATA_COLLECTED +} + +// Cleanup clears the internal data sets of the cluster +func (d *DebuggingSnapshotterImpl) Cleanup() { + if d.CancelRequest != nil { + d.CancelRequest() + } +} diff --git a/cluster-autoscaler/debuggingsnapshot/debugging_snapshotter_test.go b/cluster-autoscaler/debuggingsnapshot/debugging_snapshotter_test.go new file mode 100644 index 000000000000..c894276034fb --- /dev/null +++ b/cluster-autoscaler/debuggingsnapshot/debugging_snapshotter_test.go @@ -0,0 +1,160 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package debuggingsnapshot + +import ( + "net/http" + "net/http/httptest" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +func TestBasicSnapshotRequest(t *testing.T) { + var wg sync.WaitGroup + wg.Add(1) + snapshotter := NewDebuggingSnapshotter(true) + + pod := []*framework.PodInfo{ + { + Pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "Pod1", + }, + Spec: v1.PodSpec{ + NodeName: "testNode", + }, + }, + }, + } + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode", + }, + } + + nodeInfo := &framework.NodeInfo{ + Pods: pod, + Requested: &framework.Resource{}, + NonZeroRequested: &framework.Resource{}, + Allocatable: &framework.Resource{}, + Generation: 0, + } + + var nodeGroups []*framework.NodeInfo + nodeGroups = append(nodeGroups, nodeInfo) + nodeGroups[0].SetNode(node) + + req := httptest.NewRequest(http.MethodGet, "/", nil) + w := httptest.NewRecorder() + + go func() { + snapshotter.ResponseHandler(w, req) + wg.Done() + }() + + for !snapshotter.IsDataCollectionAllowed() { + snapshotter.StartDataCollection() + } + snapshotter.SetNodeGroupInfo(nodeGroups) + snapshotter.Flush() + + wg.Wait() + resp := w.Result() + assert.Equal(t, http.StatusOK, w.Code) + assert.Greater(t, int64(0), resp.ContentLength) +} + +func TestFlushWithoutData(t *testing.T) { + var wg sync.WaitGroup + wg.Add(1) + snapshotter := NewDebuggingSnapshotter(true) + + req := httptest.NewRequest(http.MethodGet, "/", nil) + w := httptest.NewRecorder() + + go func() { + snapshotter.ResponseHandler(w, req) + wg.Done() + }() + + for !snapshotter.IsDataCollectionAllowed() { + snapshotter.StartDataCollection() + } + snapshotter.Flush() + + wg.Wait() + resp := w.Result() + assert.Equal(t, http.StatusInternalServerError, w.Code) + assert.Greater(t, int64(0), resp.ContentLength) +} + +func TestRequestTerminationOnShutdown(t *testing.T) { + var wg sync.WaitGroup + wg.Add(1) + snapshotter := NewDebuggingSnapshotter(true) + + req := httptest.NewRequest(http.MethodGet, "/", nil) + w := httptest.NewRecorder() + + go func() { + snapshotter.ResponseHandler(w, req) + wg.Done() + }() + + for !snapshotter.IsDataCollectionAllowed() { + snapshotter.StartDataCollection() + } + + go snapshotter.Cleanup() + wg.Wait() + + assert.Equal(t, http.StatusServiceUnavailable, w.Code) +} + +func TestRejectParallelRequest(t *testing.T) { + var wg sync.WaitGroup + wg.Add(1) + snapshotter := NewDebuggingSnapshotter(true) + + req := httptest.NewRequest(http.MethodGet, "/", nil) + w := httptest.NewRecorder() + + go func() { + snapshotter.ResponseHandler(w, req) + wg.Done() + }() + + for !snapshotter.IsDataCollectionAllowed() { + snapshotter.StartDataCollection() + } + + w1 := httptest.NewRecorder() + req1 := httptest.NewRequest(http.MethodGet, "/", nil) + snapshotter.ResponseHandler(w1, req1) + assert.Equal(t, http.StatusTooManyRequests, w1.Code) + + snapshotter.SetNodeGroupInfo(nil) + snapshotter.Flush() + wg.Wait() + + assert.Equal(t, http.StatusOK, w.Code) +} diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 0c067bf71818..96443b952f4d 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -29,6 +29,8 @@ import ( "syscall" "time" + "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" + "github.com/spf13/pflag" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -181,7 +183,8 @@ var ( daemonSetEvictionForOccupiedNodes = flag.Bool("daemonset-eviction-for-occupied-nodes", true, "DaemonSet pods will be gracefully terminated from non-empty nodes") userAgent = flag.String("user-agent", "cluster-autoscaler", "User agent used for HTTP calls.") - emitPerNodeGroupMetrics = flag.Bool("emit-per-nodegroup-metrics", false, "If true, emit per node group metrics.") + emitPerNodeGroupMetrics = flag.Bool("emit-per-nodegroup-metrics", false, "If true, emit per node group metrics.") + debuggingSnapshotEnabled = flag.Bool("debugging-snapshot-enabled", false, "Whether the debugging snapshot of cluster autoscaler feature is enabled") ) func createAutoscalingOptions() config.AutoscalingOptions { @@ -304,17 +307,18 @@ func registerSignalHandlers(autoscaler core.Autoscaler) { }() } -func buildAutoscaler() (core.Autoscaler, error) { +func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, error) { // Create basic config from flags. autoscalingOptions := createAutoscalingOptions() kubeClient := createKubeClient(getKubeConfig()) eventsKubeClient := createKubeClient(getKubeConfig()) opts := core.AutoscalerOptions{ - AutoscalingOptions: autoscalingOptions, - ClusterSnapshot: simulator.NewDeltaClusterSnapshot(), - KubeClient: kubeClient, - EventsKubeClient: eventsKubeClient, + AutoscalingOptions: autoscalingOptions, + ClusterSnapshot: simulator.NewDeltaClusterSnapshot(), + KubeClient: kubeClient, + EventsKubeClient: eventsKubeClient, + DebuggingSnapshotter: debuggingSnapshotter, } opts.Processors = ca_processors.DefaultProcessors() @@ -345,10 +349,10 @@ func buildAutoscaler() (core.Autoscaler, error) { return core.NewAutoscaler(opts) } -func run(healthCheck *metrics.HealthCheck) { +func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) { metrics.RegisterAll(*emitPerNodeGroupMetrics) - autoscaler, err := buildAutoscaler() + autoscaler, err := buildAutoscaler(debuggingSnapshotter) if err != nil { klog.Fatalf("Failed to create autoscaler: %v", err) } @@ -400,12 +404,17 @@ func main() { klog.V(1).Infof("Cluster Autoscaler %s", version.ClusterAutoscalerVersion) + debuggingSnapshotter := debuggingsnapshot.NewDebuggingSnapshotter(*debuggingSnapshotEnabled) + go func() { pathRecorderMux := mux.NewPathRecorderMux("cluster-autoscaler") defaultMetricsHandler := legacyregistry.Handler().ServeHTTP pathRecorderMux.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) { defaultMetricsHandler(w, req) }) + if *debuggingSnapshotEnabled { + pathRecorderMux.HandleFunc("/snapshotz", debuggingSnapshotter.ResponseHandler) + } pathRecorderMux.HandleFunc("/health-check", healthCheck.ServeHTTP) if *enableProfiling { routes.Profiling{}.Install(pathRecorderMux) @@ -415,7 +424,7 @@ func main() { }() if !leaderElection.LeaderElect { - run(healthCheck) + run(healthCheck, debuggingSnapshotter) } else { id, err := os.Hostname() if err != nil { @@ -455,7 +464,7 @@ func main() { OnStartedLeading: func(_ ctx.Context) { // Since we are committing a suicide after losing // mastership, we can safely ignore the argument. - run(healthCheck) + run(healthCheck, debuggingSnapshotter) }, OnStoppedLeading: func() { klog.Fatalf("lost master")