diff --git a/config/monitoring/metrics/prometheus/100-grafana-dash-knative-scaling.yaml b/config/monitoring/metrics/prometheus/100-grafana-dash-knative-scaling.yaml index 5bff01345b46..2a7bdaef15e4 100644 --- a/config/monitoring/metrics/prometheus/100-grafana-dash-knative-scaling.yaml +++ b/config/monitoring/metrics/prometheus/100-grafana-dash-knative-scaling.yaml @@ -134,6 +134,30 @@ data: "intervalFactor": 1, "legendFormat": "Requested Pods", "refId": "C" + }, + { + "expr": "sum(autoscaler_pending_pods{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})", + "format": "time_series", + "interval": "1s", + "intervalFactor": 1, + "legendFormat": "Pending Pods", + "refId": "P" + }, + { + "expr": "sum(autoscaler_not_ready_pods{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})", + "format": "time_series", + "interval": "1s", + "intervalFactor": 1, + "legendFormat": "NotReady Pods", + "refId": "N" + }, + { + "expr": "sum(autoscaler_terminating_pods{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})", + "format": "time_series", + "interval": "1s", + "intervalFactor": 1, + "legendFormat": "Terminating Pods", + "refId": "T" } ], "thresholds": [], @@ -172,7 +196,88 @@ data: "show": false } ] - } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "prometheus", + "fill": 1, + "gridPos": { + "h": 9, + "w": 24, + "x": 0, + "y": 2 + }, + "id": 3, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": true, + "targets": [ + { + "expr": "sum(activator_request_concurrency{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})", + "format": "time_series", + "interval": "1s", + "intervalFactor": 1, + "legendFormat": "Request Concurrency", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Request Concurrency", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": "", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": "", + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ] + } ], "title": "Revision Pod Counts", "type": "row" diff --git a/pkg/autoscaler/autoscaler.go b/pkg/autoscaler/autoscaler.go index 985e314ad98f..f9e304b6f824 100644 --- a/pkg/autoscaler/autoscaler.go +++ b/pkg/autoscaler/autoscaler.go @@ -51,7 +51,7 @@ type Autoscaler struct { // specMux guards the current DeciderSpec and the PodCounter. specMux sync.RWMutex deciderSpec *DeciderSpec - podCounter resources.ReadyPodCounter + podCounter resources.EndpointsCounter } // New creates a new instance of autoscaler @@ -246,7 +246,7 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (desiredPodCount return desiredPodCount, excessBC, true } -func (a *Autoscaler) currentSpecAndPC() (*DeciderSpec, resources.ReadyPodCounter) { +func (a *Autoscaler) currentSpecAndPC() (*DeciderSpec, resources.EndpointsCounter) { a.specMux.RLock() defer a.specMux.RUnlock() return a.deciderSpec, a.podCounter diff --git a/pkg/autoscaler/autoscaler_test.go b/pkg/autoscaler/autoscaler_test.go index 8769b38f7ff6..0c77e7aa3c85 100644 --- a/pkg/autoscaler/autoscaler_test.go +++ b/pkg/autoscaler/autoscaler_test.go @@ -42,10 +42,10 @@ var ( kubeInformer = kubeinformers.NewSharedInformerFactory(kubeClient, 0) ) -func TestNewErrorWhenGivenNilReadyPodCounter(t *testing.T) { +func TestNewErrorWhenGivenNilEndpointsCounter(t *testing.T) { _, err := New(testNamespace, testRevision, &autoscalerfake.MetricClient{}, nil, &DeciderSpec{TargetValue: 10, ServiceName: testService}, &mockReporter{}) if err == nil { - t.Error("Expected error when ReadyPodCounter interface is nil, but got none.") + t.Error("Expected error when EndpointsCounter interface is nil, but got none.") } } @@ -335,6 +335,21 @@ func (r *mockReporter) ReportActualPodCount(v int64) error { return nil } +// ReportNotReadyPodCount of a mockReporter does nothing and return nil for error. +func (r *mockReporter) ReportNotReadyPodCount(v int64) error { + return nil +} + +// ReportPendingPodCount of a mockReporter does nothing and return nil for error. +func (r *mockReporter) ReportPendingPodCount(v int64) error { + return nil +} + +// ReportTerminatingPodCount of a mockReporter does nothing and return nil for error. +func (r *mockReporter) ReportTerminatingPodCount(v int64) error { + return nil +} + // ReportStableRequestConcurrency of a mockReporter does nothing and return nil for error. func (r *mockReporter) ReportStableRequestConcurrency(v float64) error { return nil diff --git a/pkg/autoscaler/stats_reporter.go b/pkg/autoscaler/stats_reporter.go index 59b5f864ad4e..20622865812f 100644 --- a/pkg/autoscaler/stats_reporter.go +++ b/pkg/autoscaler/stats_reporter.go @@ -42,6 +42,18 @@ var ( "actual_pods", "Number of pods that are allocated currently", stats.UnitDimensionless) + notReadyPodCountM = stats.Int64( + "not_ready_pods", + "Number of pods that are not ready currently", + stats.UnitDimensionless) + pendingPodCountM = stats.Int64( + "pending_pods", + "Number of pods that are pending currently", + stats.UnitDimensionless) + terminatingPodCountM = stats.Int64( + "terminating_pods", + "Number of pods that are terminating currently", + stats.UnitDimensionless) excessBurstCapacityM = stats.Float64( "excess_burst_capacity", "Excess burst capacity overserved over the stable window", @@ -103,6 +115,24 @@ func register() { Aggregation: view.LastValue(), TagKeys: metrics.CommonRevisionKeys, }, + &view.View{ + Description: "Number of pods that are not ready currently", + Measure: notReadyPodCountM, + Aggregation: view.LastValue(), + TagKeys: metrics.CommonRevisionKeys, + }, + &view.View{ + Description: "Number of pods that are pending currently", + Measure: pendingPodCountM, + Aggregation: view.LastValue(), + TagKeys: metrics.CommonRevisionKeys, + }, + &view.View{ + Description: "Number of pods that are terminating currently", + Measure: terminatingPodCountM, + Aggregation: view.LastValue(), + TagKeys: metrics.CommonRevisionKeys, + }, &view.View{ Description: "Average of requests count over the stable window", Measure: stableRequestConcurrencyM, @@ -161,6 +191,9 @@ type StatsReporter interface { ReportDesiredPodCount(v int64) error ReportRequestedPodCount(v int64) error ReportActualPodCount(v int64) error + ReportNotReadyPodCount(v int64) error + ReportTerminatingPodCount(v int64) error + ReportPendingPodCount(v int64) error ReportStableRequestConcurrency(v float64) error ReportPanicRequestConcurrency(v float64) error ReportTargetRequestConcurrency(v float64) error @@ -221,6 +254,21 @@ func (r *Reporter) ReportActualPodCount(v int64) error { return r.report(actualPodCountM.M(v)) } +// ReportNotReadyPodCount captures value v for not ready pod count measure. +func (r *Reporter) ReportNotReadyPodCount(v int64) error { + return r.report(notReadyPodCountM.M(v)) +} + +// ReportPendingPodCount captures value v for pending pod count measure. +func (r *Reporter) ReportPendingPodCount(v int64) error { + return r.report(pendingPodCountM.M(v)) +} + +// ReportTerminatingPodCount captures value v for terminating pod count measure. +func (r *Reporter) ReportTerminatingPodCount(v int64) error { + return r.report(terminatingPodCountM.M(v)) +} + // ReportExcessBurstCapacity captures value v for excess target burst capacity. func (r *Reporter) ReportExcessBurstCapacity(v float64) error { return r.report(excessBurstCapacityM.M(v)) diff --git a/pkg/autoscaler/stats_reporter_test.go b/pkg/autoscaler/stats_reporter_test.go index 499b5255ae87..00c14c920bd2 100644 --- a/pkg/autoscaler/stats_reporter_test.go +++ b/pkg/autoscaler/stats_reporter_test.go @@ -58,6 +58,9 @@ func TestReporterReport(t *testing.T) { expectSuccess(t, "ReportDesiredPodCount", func() error { return r.ReportDesiredPodCount(10) }) expectSuccess(t, "ReportRequestedPodCount", func() error { return r.ReportRequestedPodCount(7) }) expectSuccess(t, "ReportActualPodCount", func() error { return r.ReportActualPodCount(5) }) + expectSuccess(t, "ReportNotReadyPodCount", func() error { return r.ReportNotReadyPodCount(9) }) + expectSuccess(t, "ReportPendingPodCount", func() error { return r.ReportPendingPodCount(6) }) + expectSuccess(t, "ReportTerminatingPodCount", func() error { return r.ReportTerminatingPodCount(8) }) expectSuccess(t, "ReportPanic", func() error { return r.ReportPanic(0) }) expectSuccess(t, "ReportStableRequestConcurrency", func() error { return r.ReportStableRequestConcurrency(2) }) expectSuccess(t, "ReportPanicRequestConcurrency", func() error { return r.ReportPanicRequestConcurrency(3) }) @@ -69,6 +72,9 @@ func TestReporterReport(t *testing.T) { metricstest.CheckLastValueData(t, "desired_pods", wantTags, 10) metricstest.CheckLastValueData(t, "requested_pods", wantTags, 7) metricstest.CheckLastValueData(t, "actual_pods", wantTags, 5) + metricstest.CheckLastValueData(t, "not_ready_pods", wantTags, 9) + metricstest.CheckLastValueData(t, "pending_pods", wantTags, 6) + metricstest.CheckLastValueData(t, "terminating_pods", wantTags, 8) metricstest.CheckLastValueData(t, "panic_mode", wantTags, 0) metricstest.CheckLastValueData(t, "stable_request_concurrency", wantTags, 2) metricstest.CheckLastValueData(t, "excess_burst_capacity", wantTags, 19.84) @@ -94,6 +100,21 @@ func TestReporterReport(t *testing.T) { expectSuccess(t, "ReportActualPodCount", func() error { return r.ReportActualPodCount(9) }) metricstest.CheckLastValueData(t, "actual_pods", wantTags, 9) + expectSuccess(t, "ReportNotReadyPodCount", func() error { return r.ReportNotReadyPodCount(6) }) + expectSuccess(t, "ReportNotReadyPodCount", func() error { return r.ReportNotReadyPodCount(5) }) + expectSuccess(t, "ReportNotReadyPodCount", func() error { return r.ReportNotReadyPodCount(4) }) + metricstest.CheckLastValueData(t, "not_ready_pods", wantTags, 4) + + expectSuccess(t, "ReportPendingPodCount", func() error { return r.ReportPendingPodCount(3) }) + expectSuccess(t, "ReportPendingPodCount", func() error { return r.ReportPendingPodCount(2) }) + expectSuccess(t, "ReportPendingPodCount", func() error { return r.ReportPendingPodCount(1) }) + metricstest.CheckLastValueData(t, "pending_pods", wantTags, 1) + + expectSuccess(t, "ReportTerminatingPodCount", func() error { return r.ReportTerminatingPodCount(5) }) + expectSuccess(t, "ReportTerminatingPodCount", func() error { return r.ReportTerminatingPodCount(3) }) + expectSuccess(t, "ReportTerminatingPodCount", func() error { return r.ReportTerminatingPodCount(8) }) + metricstest.CheckLastValueData(t, "terminating_pods", wantTags, 8) + expectSuccess(t, "ReportPanic", func() error { return r.ReportPanic(1) }) expectSuccess(t, "ReportPanic", func() error { return r.ReportPanic(0) }) expectSuccess(t, "ReportPanic", func() error { return r.ReportPanic(1) }) @@ -131,6 +152,9 @@ func resetMetrics() { desiredPodCountM.Name(), requestedPodCountM.Name(), actualPodCountM.Name(), + notReadyPodCountM.Name(), + pendingPodCountM.Name(), + terminatingPodCountM.Name(), stableRequestConcurrencyM.Name(), panicRequestConcurrencyM.Name(), excessBurstCapacityM.Name(), diff --git a/pkg/autoscaler/stats_scraper.go b/pkg/autoscaler/stats_scraper.go index 3c2069052fdd..0716764b6a84 100644 --- a/pkg/autoscaler/stats_scraper.go +++ b/pkg/autoscaler/stats_scraper.go @@ -86,13 +86,13 @@ var cacheDisabledClient = &http.Client{ // for details. type ServiceScraper struct { sClient scrapeClient - counter resources.ReadyPodCounter + counter resources.EndpointsCounter url string } // NewServiceScraper creates a new StatsScraper for the Revision which // the given Metric is responsible for. -func NewServiceScraper(metric *av1alpha1.Metric, counter resources.ReadyPodCounter) (*ServiceScraper, error) { +func NewServiceScraper(metric *av1alpha1.Metric, counter resources.EndpointsCounter) (*ServiceScraper, error) { sClient, err := newHTTPScrapeClient(cacheDisabledClient) if err != nil { return nil, err @@ -102,7 +102,7 @@ func NewServiceScraper(metric *av1alpha1.Metric, counter resources.ReadyPodCount func newServiceScraperWithClient( metric *av1alpha1.Metric, - counter resources.ReadyPodCounter, + counter resources.EndpointsCounter, sClient scrapeClient) (*ServiceScraper, error) { if metric == nil { return nil, errors.New("metric must not be nil") diff --git a/pkg/autoscaler/stats_scraper_test.go b/pkg/autoscaler/stats_scraper_test.go index 31aa484d5744..8308bee57312 100644 --- a/pkg/autoscaler/stats_scraper_test.go +++ b/pkg/autoscaler/stats_scraper_test.go @@ -80,7 +80,7 @@ func TestNewServiceScraperWithClientErrorCases(t *testing.T) { name string metric *av1alpha1.Metric client scrapeClient - counter resources.ReadyPodCounter + counter resources.EndpointsCounter expectedErr string }{{ name: "Empty Decider", diff --git a/pkg/reconciler/autoscaling/kpa/controller.go b/pkg/reconciler/autoscaling/kpa/controller.go index 53178cc4b286..024f07d49fc3 100644 --- a/pkg/reconciler/autoscaling/kpa/controller.go +++ b/pkg/reconciler/autoscaling/kpa/controller.go @@ -20,6 +20,7 @@ import ( "context" endpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints" + podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service" "knative.dev/serving/pkg/client/injection/ducks/autoscaling/v1alpha1/podscalable" metricinformer "knative.dev/serving/pkg/client/injection/informers/autoscaling/v1alpha1/metric" @@ -51,6 +52,7 @@ func NewController( sksInformer := sksinformer.Get(ctx) serviceInformer := serviceinformer.Get(ctx) endpointsInformer := endpointsinformer.Get(ctx) + podsInformer := podinformer.Get(ctx) metricInformer := metricinformer.Get(ctx) psInformerFactory := podscalable.Get(ctx) @@ -64,6 +66,7 @@ func NewController( PSInformerFactory: psInformerFactory, }, endpointsLister: endpointsInformer.Lister(), + podsLister: podsInformer.Lister(), deciders: deciders, } impl := controller.NewImpl(c, c.Logger, "KPA-Class Autoscaling") diff --git a/pkg/reconciler/autoscaling/kpa/kpa.go b/pkg/reconciler/autoscaling/kpa/kpa.go index 7846e3e85817..5830e2fe5da6 100644 --- a/pkg/reconciler/autoscaling/kpa/kpa.go +++ b/pkg/reconciler/autoscaling/kpa/kpa.go @@ -42,11 +42,21 @@ import ( "k8s.io/client-go/tools/cache" ) +// podCounts keep record of the number of pods +// for each revision +type podCounts struct { + Ready int + NotReady int + Pending int + Terminating int +} + // Reconciler tracks PAs and right sizes the ScaleTargetRef based on the // information from Deciders. type Reconciler struct { *areconciler.Base endpointsLister corev1listers.EndpointsLister + podsLister corev1listers.PodLister deciders resources.Deciders scaler *scaler } @@ -130,7 +140,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pa *pav1alpha1.PodAutoscaler if _, err = c.ReconcileSKS(ctx, pa, nv1alpha1.SKSOperationModeServe); err != nil { return fmt.Errorf("error reconciling SKS: %w", err) } - return computeStatus(pa, scaleUnknown, 0) + return computeStatus(pa, scaleUnknown, podCounts{}) } pa.Status.MetricsServiceName = sks.Status.PrivateServiceName @@ -177,20 +187,40 @@ func (c *Reconciler) reconcile(ctx context.Context, pa *pav1alpha1.PodAutoscaler // Compare the desired and observed resources to determine our situation. // We fetch private endpoints here, since for scaling we're interested in the actual // state of the deployment. - got := 0 + ready, notReady := 0, 0 // Propagate service name. pa.Status.ServiceName = sks.Status.ServiceName // Currently, SKS.IsReady==True when revision has >0 ready pods. if sks.Status.IsReady() { - podCounter := resourceutil.NewScopedEndpointsCounter(c.endpointsLister, pa.Namespace, sks.Status.PrivateServiceName) - got, err = podCounter.ReadyCount() + podEndpointCounter := resourceutil.NewScopedEndpointsCounter(c.endpointsLister, pa.Namespace, sks.Status.PrivateServiceName) + ready, err = podEndpointCounter.ReadyCount() + if err != nil { + return fmt.Errorf("error checking endpoints %s: %w", sks.Status.PrivateServiceName, err) + } + + notReady, err = podEndpointCounter.NotReadyCount() if err != nil { return fmt.Errorf("error checking endpoints %s: %w", sks.Status.PrivateServiceName, err) } } - logger.Infof("PA scale got=%d, want=%d, ebc=%d", got, want, decider.Status.ExcessBurstCapacity) - return computeStatus(pa, want, got) + + podCounter := resourceutil.NewNotRunningPodsCounter(c.podsLister, pa.Namespace, sks.Status.ServiceName) + pending, terminating, err := podCounter.PendingTerminatingCount() + if err != nil { + return fmt.Errorf("error checking pods %s: %w", sks.Status.PrivateServiceName, err) + } + + logger.Infof("PA scale got=%d, want=%d, ebc=%d", ready, want, decider.Status.ExcessBurstCapacity) + + pc := podCounts{ + Ready: ready, + NotReady: notReady, + Pending: pending, + Terminating: terminating, + } + logger.Infof("Observed pod counts=%#v", pc) + return computeStatus(pa, want, pc) } func (c *Reconciler) reconcileDecider(ctx context.Context, pa *pav1alpha1.PodAutoscaler, k8sSvc string) (*autoscaler.Decider, error) { @@ -217,20 +247,20 @@ func (c *Reconciler) reconcileDecider(ctx context.Context, pa *pav1alpha1.PodAut return decider, nil } -func computeStatus(pa *pav1alpha1.PodAutoscaler, want int32, got int) error { - pa.Status.DesiredScale, pa.Status.ActualScale = &want, ptr.Int32(int32(got)) +func computeStatus(pa *pav1alpha1.PodAutoscaler, want int32, pc podCounts) error { + pa.Status.DesiredScale, pa.Status.ActualScale = &want, ptr.Int32(int32(pc.Ready)) - if err := reportMetrics(pa, want, got); err != nil { + if err := reportMetrics(pa, want, pc); err != nil { return fmt.Errorf("error reporting metrics: %w", err) } - computeActiveCondition(pa, want, got) + computeActiveCondition(pa, want, pc.Ready) pa.Status.ObservedGeneration = pa.Generation return nil } -func reportMetrics(pa *pav1alpha1.PodAutoscaler, want int32, got int) error { +func reportMetrics(pa *pav1alpha1.PodAutoscaler, want int32, pc podCounts) error { var serviceLabel string var configLabel string if pa.Labels != nil { @@ -242,7 +272,10 @@ func reportMetrics(pa *pav1alpha1.PodAutoscaler, want int32, got int) error { return err } - reporter.ReportActualPodCount(int64(got)) + reporter.ReportActualPodCount(int64(pc.Ready)) + reporter.ReportNotReadyPodCount(int64(pc.NotReady)) + reporter.ReportPendingPodCount(int64(pc.Pending)) + reporter.ReportTerminatingPodCount(int64(pc.Terminating)) // Negative "want" values represent an empty metrics pipeline and thus no specific request is being made. if want >= 0 { reporter.ReportRequestedPodCount(int64(want)) diff --git a/pkg/reconciler/autoscaling/kpa/kpa_test.go b/pkg/reconciler/autoscaling/kpa/kpa_test.go index a22e770e7ac9..13a0ac01020e 100644 --- a/pkg/reconciler/autoscaling/kpa/kpa_test.go +++ b/pkg/reconciler/autoscaling/kpa/kpa_test.go @@ -29,6 +29,7 @@ import ( // These are the fake informers we want setup. fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake" fakeendpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints/fake" + _ "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake" fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" "knative.dev/pkg/kmeta" @@ -985,6 +986,7 @@ func TestReconcile(t *testing.T) { PSInformerFactory: psf, }, endpointsLister: listers.GetEndpointsLister(), + podsLister: listers.GetPodsLister(), deciders: fakeDeciders, scaler: scaler, } diff --git a/pkg/reconciler/testing/v1alpha1/listers.go b/pkg/reconciler/testing/v1alpha1/listers.go index 3c9abaaf5a77..0b3e08f03446 100644 --- a/pkg/reconciler/testing/v1alpha1/listers.go +++ b/pkg/reconciler/testing/v1alpha1/listers.go @@ -193,6 +193,11 @@ func (l *Listers) GetEndpointsLister() corev1listers.EndpointsLister { return corev1listers.NewEndpointsLister(l.IndexerFor(&corev1.Endpoints{})) } +// GetPodsLister gets lister for pods. +func (l *Listers) GetPodsLister() corev1listers.PodLister { + return corev1listers.NewPodLister(l.IndexerFor(&corev1.Pod{})) +} + func (l *Listers) GetSecretLister() corev1listers.SecretLister { return corev1listers.NewSecretLister(l.IndexerFor(&corev1.Secret{})) } diff --git a/pkg/resources/endpoints.go b/pkg/resources/endpoints.go index 0366daef6f69..d9684eef98b6 100644 --- a/pkg/resources/endpoints.go +++ b/pkg/resources/endpoints.go @@ -23,23 +23,33 @@ import ( // ReadyAddressCount returns the total number of addresses ready for the given endpoint. func ReadyAddressCount(endpoints *corev1.Endpoints) int { - var total int + var ready int for _, subset := range endpoints.Subsets { - total += len(subset.Addresses) + ready += len(subset.Addresses) } - return total + return ready } -// ReadyPodCounter provides a count of currently ready pods. This +// NotReadyAddressCount returns the total number of addresses ready for the given endpoint. +func NotReadyAddressCount(endpoints *corev1.Endpoints) int { + var notReady int + for _, subset := range endpoints.Subsets { + notReady += len(subset.NotReadyAddresses) + } + return notReady +} + +// EndpointsCounter provides a count of currently ready and notReady pods. This // information is used by UniScaler implementations to make scaling // decisions. The interface prevents the UniScaler from needing to // know how counts are performed. // The int return value represents the number of pods that are ready // to handle incoming requests. -// The error value is returned if the ReadyPodCounter is unable to +// The error value is returned if the EndpointsCounter is unable to // calculate a value. -type ReadyPodCounter interface { +type EndpointsCounter interface { ReadyCount() (int, error) + NotReadyCount() (int, error) } type scopedEndpointCounter struct { @@ -56,13 +66,21 @@ func (eac *scopedEndpointCounter) ReadyCount() (int, error) { return ReadyAddressCount(endpoints), nil } -// NewScopedEndpointsCounter creates a ReadyPodCounter that uses +func (eac *scopedEndpointCounter) NotReadyCount() (int, error) { + endpoints, err := eac.endpointsLister.Endpoints(eac.namespace).Get(eac.serviceName) + if err != nil { + return 0, err + } + return NotReadyAddressCount(endpoints), nil +} + +// NewScopedEndpointsCounter creates a EndpointsCounter that uses // a count of endpoints for a namespace/serviceName as the value // of ready pods. The values returned by ReadyCount() will vary // over time. // lister is used to retrieve endpoints for counting with the // scope of namespace/serviceName. -func NewScopedEndpointsCounter(lister corev1listers.EndpointsLister, namespace, serviceName string) ReadyPodCounter { +func NewScopedEndpointsCounter(lister corev1listers.EndpointsLister, namespace, serviceName string) EndpointsCounter { return &scopedEndpointCounter{ endpointsLister: lister, namespace: namespace, diff --git a/pkg/resources/endpoints_test.go b/pkg/resources/endpoints_test.go index 370c80ff3de9..59469ae9050c 100644 --- a/pkg/resources/endpoints_test.go +++ b/pkg/resources/endpoints_test.go @@ -42,27 +42,32 @@ func TestScopedEndpointsCounter(t *testing.T) { addressCounter := NewScopedEndpointsCounter(endpointsClient.Lister(), testNamespace, testService) tests := []struct { - name string - endpoints *corev1.Endpoints - want int - wantErr bool + name string + endpoints *corev1.Endpoints + wantReady int + wantNotReady int + wantErr bool }{{ - name: "no endpoints at all", - endpoints: nil, - want: 0, - wantErr: true, + name: "no endpoints at all", + endpoints: nil, + wantReady: 0, + wantNotReady: 0, + wantErr: true, }, { - name: "no ready addresses", - endpoints: endpoints(0), - want: 0, + name: "no ready/not-ready addresses", + endpoints: endpoints(0, 0), + wantReady: 0, + wantNotReady: 0, }, { - name: "one ready address", - endpoints: endpoints(1), - want: 1, + name: "one ready/two not-ready addresses", + endpoints: endpoints(1, 2), + wantReady: 1, + wantNotReady: 2, }, { - name: "ten ready addresses", - endpoints: endpoints(10), - want: 10, + name: "ten ready/twenty not-ready addresses", + endpoints: endpoints(10, 20), + wantReady: 10, + wantNotReady: 20, }} for _, test := range tests { @@ -71,8 +76,16 @@ func TestScopedEndpointsCounter(t *testing.T) { createEndpoints(test.endpoints) } got, err := addressCounter.ReadyCount() - if got != test.want { - t.Errorf("ReadyCount() = %d, want: %d", got, test.want) + if got != test.wantReady { + t.Errorf("ReadyCount() = %d, wantReady: %d", got, test.wantReady) + } + if got, want := (err != nil), test.wantErr; got != want { + t.Errorf("WantErr = %v, want: %v, err: %v", got, want, err) + } + + got, err = addressCounter.NotReadyCount() + if got != test.wantNotReady { + t.Errorf("NotReadyCount() = %d, wantNotReady: %d", got, test.wantNotReady) } if got, want := (err != nil), test.wantErr; got != want { t.Errorf("WantErr = %v, want: %v, err: %v", got, want, err) @@ -83,45 +96,60 @@ func TestScopedEndpointsCounter(t *testing.T) { func TestReadyAddressCount(t *testing.T) { tests := []struct { - name string - endpoints *corev1.Endpoints - want int + name string + endpoints *corev1.Endpoints + wantReady int + wantNotReady int }{{ - name: "no ready addresses", - endpoints: endpoints(0), - want: 0, + name: "no ready/not-ready addresses", + endpoints: endpoints(0, 0), + wantReady: 0, + wantNotReady: 0, }, { - name: "one ready address", - endpoints: endpoints(1), - want: 1, + name: "one ready/two not-ready addresses", + endpoints: endpoints(1, 2), + wantReady: 1, + wantNotReady: 2, }, { - name: "ten ready addresses", - endpoints: endpoints(10), - want: 10, + name: "ten ready/twenty not-ready addresses", + endpoints: endpoints(10, 20), + wantReady: 10, + wantNotReady: 20, }} for _, test := range tests { t.Run(test.name, func(t *testing.T) { - if got := ReadyAddressCount(test.endpoints); got != test.want { - t.Errorf("ReadyAddressCount() = %d, want: %d", got, test.want) + if got := ReadyAddressCount(test.endpoints); got != test.wantReady { + t.Errorf("ReadyAddressCount() = %d, want: %d", got, test.wantReady) + } + if got := NotReadyAddressCount(test.endpoints); got != test.wantNotReady { + t.Errorf("NotReadyAddressCount() = %d, want: %d", got, test.wantNotReady) } }) } } -func endpoints(ipCount int) *corev1.Endpoints { +func endpoints(readyIPCount, notReadyIPCount int) *corev1.Endpoints { ep := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Namespace: testNamespace, Name: testService, }, } - addresses := make([]corev1.EndpointAddress, ipCount) - for i := 0; i < ipCount; i++ { - addresses[i] = corev1.EndpointAddress{IP: fmt.Sprintf("127.0.0.%v", i+1)} + addresses := make([]corev1.EndpointAddress, readyIPCount) + notReadyAddresses := make([]corev1.EndpointAddress, notReadyIPCount) + + for i := 0; i < readyIPCount; i++ { + addresses[i] = corev1.EndpointAddress{IP: fmt.Sprintf("127.0.0.%v", i*3+1)} } + + for i := 0; i < notReadyIPCount; i++ { + notReadyAddresses[i] = corev1.EndpointAddress{IP: fmt.Sprintf("127.0.0.%v", i*3+2)} + } + ep.Subsets = []corev1.EndpointSubset{{ - Addresses: addresses, + Addresses: addresses, + NotReadyAddresses: notReadyAddresses, }} return ep } diff --git a/pkg/resources/pods.go b/pkg/resources/pods.go new file mode 100644 index 000000000000..40684fd9bb76 --- /dev/null +++ b/pkg/resources/pods.go @@ -0,0 +1,70 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + corev1listers "k8s.io/client-go/listers/core/v1" + "knative.dev/serving/pkg/apis/serving" +) + +// NotRunningPodCounter provides a count of pods currently not in the +// RUNNING state. The interface exempts users from needing to +// know how counts are performed. +type notRunningPodCounter struct { + podsLister corev1listers.PodLister + namespace string + serviceName string +} + +// NewNotRunningPodsCounter creates a NotRunningPodCounter that counts +// pods for a namespace/serviceNam. The values returned by +// TerminatingCount() and PendingCount() will vary over time. +func NewNotRunningPodsCounter(lister corev1listers.PodLister, namespace, serviceName string) notRunningPodCounter { + return notRunningPodCounter{ + podsLister: lister, + namespace: namespace, + serviceName: serviceName, + } +} + +// PendingTerminatingCount returns the number of pods in a Pending or +// Terminating state +func (pc *notRunningPodCounter) PendingTerminatingCount() (int, int, error) { + pods, err := pc.podsLister.Pods(pc.namespace).List(labels.SelectorFromSet(labels.Set{ + serving.RevisionLabelKey: pc.serviceName, + })) + if err != nil { + return 0, 0, err + } + + pending, terminating := pendingTerminatingCount(pods) + return pending, terminating, nil +} + +func pendingTerminatingCount(pods []*corev1.Pod) (int, int) { + pending, terminating := 0, 0 + for _, pod := range pods { + if pod.ObjectMeta.DeletionTimestamp != nil && pod.Status.Phase == corev1.PodRunning { + terminating++ + } else if pod.Status.Phase == corev1.PodPending { + pending++ + } + } + return pending, terminating +} diff --git a/pkg/resources/pods_test.go b/pkg/resources/pods_test.go new file mode 100644 index 000000000000..61814dac10c3 --- /dev/null +++ b/pkg/resources/pods_test.go @@ -0,0 +1,143 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "fmt" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeinformers "k8s.io/client-go/informers" + fakek8s "k8s.io/client-go/kubernetes/fake" + "knative.dev/serving/pkg/apis/serving" +) + +func TestScopedPodsCounter(t *testing.T) { + kubeClient := fakek8s.NewSimpleClientset() + podsClient := kubeinformers.NewSharedInformerFactory(kubeClient, 0).Core().V1().Pods() + createPods := func(pods []*corev1.Pod) { + for _, p := range pods { + kubeClient.CoreV1().Pods(testNamespace).Create(p) + podsClient.Informer().GetIndexer().Add(p) + } + } + + podCounter := NewNotRunningPodsCounter(podsClient.Lister(), testNamespace, testService) + + tests := []struct { + name string + pods []*corev1.Pod + wantRunning int + wantPending int + wantTerminating int + wantErr bool + }{{ + name: "no pods", + pods: pods(0, 0, 0), + wantRunning: 0, + wantPending: 0, + wantTerminating: 0, + }, { + name: "one running/two pending/three terminating pod", + pods: pods(1, 2, 3), + wantRunning: 1, + wantPending: 2, + wantTerminating: 3, + }, { + name: "ten running/eleven pending/twelve terminating pods", + pods: pods(10, 11, 12), + wantRunning: 10, + wantPending: 11, + wantTerminating: 12, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if test.pods != nil { + createPods(test.pods) + } + + pending, terminating, err := podCounter.PendingTerminatingCount() + if got, want := (err != nil), test.wantErr; got != want { + t.Errorf("WantErr = %v, want: %v, err: %v", got, want, err) + } + + if pending != test.wantPending { + t.Errorf("PendingCount() = %d, want: %d", pending, test.wantPending) + } + + if terminating != test.wantTerminating { + t.Errorf("TerminatingCount() = %d, want: %d", terminating, test.wantTerminating) + } + + }) + } +} + +func pods(running, pending, terminating int) []*corev1.Pod { + pods := make([]*corev1.Pod, running) + + for i := 0; i < running; i++ { + // running pod + p := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("running-pod-%d", i), + Namespace: testNamespace, + Labels: map[string]string{serving.RevisionLabelKey: testService}, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + pods[i] = p + } + + for i := 0; i < terminating; i++ { + // terminating pod + now := metav1.Now() + p := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("terminating-pod-%d", i), + Namespace: testNamespace, + DeletionTimestamp: &now, + Labels: map[string]string{serving.RevisionLabelKey: testService}, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + pods = append(pods, p) + } + + for i := 0; i < pending; i++ { + // pending pod + p := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pending-pod-%d", i), + Namespace: testNamespace, + Labels: map[string]string{serving.RevisionLabelKey: testService}, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + }, + } + pods = append(pods, p) + } + + return pods +}