Skip to content

Commit

Permalink
add podsLister to kpa and report pod state metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
nimakaviani committed Jan 9, 2020
1 parent d4c2097 commit 627e676
Show file tree
Hide file tree
Showing 12 changed files with 497 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,30 @@ data:
"intervalFactor": 1,
"legendFormat": "Requested Pods",
"refId": "C"
},
{
"expr": "sum(autoscaler_pending_pods{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})",
"format": "time_series",
"interval": "1s",
"intervalFactor": 1,
"legendFormat": "Pending Pods",
"refId": "P"
},
{
"expr": "sum(autoscaler_not_ready_pods{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})",
"format": "time_series",
"interval": "1s",
"intervalFactor": 1,
"legendFormat": "NotReady Pods",
"refId": "N"
},
{
"expr": "sum(autoscaler_terminating_pods{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})",
"format": "time_series",
"interval": "1s",
"intervalFactor": 1,
"legendFormat": "Terminating Pods",
"refId": "T"
}
],
"thresholds": [],
Expand Down Expand Up @@ -172,7 +196,88 @@ data:
"show": false
}
]
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "prometheus",
"fill": 1,
"gridPos": {
"h": 9,
"w": 24,
"x": 0,
"y": 2
},
"id": 3,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": true,
"targets": [
{
"expr": "sum(activator_request_concurrency{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})",
"format": "time_series",
"interval": "1s",
"intervalFactor": 1,
"legendFormat": "Request Concurrency",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
"title": "Request Concurrency",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": "",
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": "",
"logBase": 1,
"max": null,
"min": null,
"show": false
}
]
}
],
"title": "Revision Pod Counts",
"type": "row"
Expand Down
15 changes: 15 additions & 0 deletions pkg/autoscaler/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,21 @@ func (r *mockReporter) ReportActualPodCount(v int64) error {
return nil
}

// ReportNotReadyPodCount of a mockReporter does nothing and return nil for error.
func (r *mockReporter) ReportNotReadyPodCount(v int64) error {
return nil
}

// ReportPendingPodCount of a mockReporter does nothing and return nil for error.
func (r *mockReporter) ReportPendingPodCount(v int64) error {
return nil
}

// ReportTerminatingPodCount of a mockReporter does nothing and return nil for error.
func (r *mockReporter) ReportTerminatingPodCount(v int64) error {
return nil
}

// ReportStableRequestConcurrency of a mockReporter does nothing and return nil for error.
func (r *mockReporter) ReportStableRequestConcurrency(v float64) error {
return nil
Expand Down
45 changes: 45 additions & 0 deletions pkg/autoscaler/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ var (
"actual_pods",
"Number of pods that are allocated currently",
stats.UnitDimensionless)
notReadyPodCountM = stats.Int64(
"not_ready_pods",
"Number of pods that are not ready currently",
stats.UnitDimensionless)
pendingPodCountM = stats.Int64(
"pending_pods",
"Number of pods that are pending currently",
stats.UnitDimensionless)
terminatingPodCountM = stats.Int64(
"terminating_pods",
"Number of pods that are terminating currently",
stats.UnitDimensionless)
excessBurstCapacityM = stats.Float64(
"excess_burst_capacity",
"Excess burst capacity overserved over the stable window",
Expand Down Expand Up @@ -103,6 +115,24 @@ func register() {
Aggregation: view.LastValue(),
TagKeys: metrics.CommonRevisionKeys,
},
&view.View{
Description: "Number of pods that are not ready currently",
Measure: notReadyPodCountM,
Aggregation: view.LastValue(),
TagKeys: metrics.CommonRevisionKeys,
},
&view.View{
Description: "Number of pods that are pending currently",
Measure: pendingPodCountM,
Aggregation: view.LastValue(),
TagKeys: metrics.CommonRevisionKeys,
},
&view.View{
Description: "Number of pods that are terminating currently",
Measure: terminatingPodCountM,
Aggregation: view.LastValue(),
TagKeys: metrics.CommonRevisionKeys,
},
&view.View{
Description: "Average of requests count over the stable window",
Measure: stableRequestConcurrencyM,
Expand Down Expand Up @@ -161,6 +191,9 @@ type StatsReporter interface {
ReportDesiredPodCount(v int64) error
ReportRequestedPodCount(v int64) error
ReportActualPodCount(v int64) error
ReportNotReadyPodCount(v int64) error
ReportTerminatingPodCount(v int64) error
ReportPendingPodCount(v int64) error
ReportStableRequestConcurrency(v float64) error
ReportPanicRequestConcurrency(v float64) error
ReportTargetRequestConcurrency(v float64) error
Expand Down Expand Up @@ -221,6 +254,18 @@ func (r *Reporter) ReportActualPodCount(v int64) error {
return r.report(actualPodCountM.M(v))
}

func (r *Reporter) ReportNotReadyPodCount(v int64) error {
return r.report(notReadyPodCountM.M(v))
}

func (r *Reporter) ReportPendingPodCount(v int64) error {
return r.report(pendingPodCountM.M(v))
}

func (r *Reporter) ReportTerminatingPodCount(v int64) error {
return r.report(terminatingPodCountM.M(v))
}

// ReportExcessBurstCapacity captures value v for excess target burst capacity.
func (r *Reporter) ReportExcessBurstCapacity(v float64) error {
return r.report(excessBurstCapacityM.M(v))
Expand Down
24 changes: 24 additions & 0 deletions pkg/autoscaler/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func TestReporterReport(t *testing.T) {
expectSuccess(t, "ReportDesiredPodCount", func() error { return r.ReportDesiredPodCount(10) })
expectSuccess(t, "ReportRequestedPodCount", func() error { return r.ReportRequestedPodCount(7) })
expectSuccess(t, "ReportActualPodCount", func() error { return r.ReportActualPodCount(5) })
expectSuccess(t, "ReportNotReadyPodCount", func() error { return r.ReportNotReadyPodCount(9) })
expectSuccess(t, "ReportPendingPodCount", func() error { return r.ReportPendingPodCount(6) })
expectSuccess(t, "ReportTerminatingPodCount", func() error { return r.ReportTerminatingPodCount(8) })
expectSuccess(t, "ReportPanic", func() error { return r.ReportPanic(0) })
expectSuccess(t, "ReportStableRequestConcurrency", func() error { return r.ReportStableRequestConcurrency(2) })
expectSuccess(t, "ReportPanicRequestConcurrency", func() error { return r.ReportPanicRequestConcurrency(3) })
Expand All @@ -69,6 +72,9 @@ func TestReporterReport(t *testing.T) {
metricstest.CheckLastValueData(t, "desired_pods", wantTags, 10)
metricstest.CheckLastValueData(t, "requested_pods", wantTags, 7)
metricstest.CheckLastValueData(t, "actual_pods", wantTags, 5)
metricstest.CheckLastValueData(t, "not_ready_pods", wantTags, 9)
metricstest.CheckLastValueData(t, "pending_pods", wantTags, 6)
metricstest.CheckLastValueData(t, "terminating_pods", wantTags, 8)
metricstest.CheckLastValueData(t, "panic_mode", wantTags, 0)
metricstest.CheckLastValueData(t, "stable_request_concurrency", wantTags, 2)
metricstest.CheckLastValueData(t, "excess_burst_capacity", wantTags, 19.84)
Expand All @@ -94,6 +100,21 @@ func TestReporterReport(t *testing.T) {
expectSuccess(t, "ReportActualPodCount", func() error { return r.ReportActualPodCount(9) })
metricstest.CheckLastValueData(t, "actual_pods", wantTags, 9)

expectSuccess(t, "ReportNotReadyPodCount", func() error { return r.ReportNotReadyPodCount(6) })
expectSuccess(t, "ReportNotReadyPodCount", func() error { return r.ReportNotReadyPodCount(5) })
expectSuccess(t, "ReportNotReadyPodCount", func() error { return r.ReportNotReadyPodCount(4) })
metricstest.CheckLastValueData(t, "not_ready_pods", wantTags, 4)

expectSuccess(t, "ReportPendingPodCount", func() error { return r.ReportPendingPodCount(3) })
expectSuccess(t, "ReportPendingPodCount", func() error { return r.ReportPendingPodCount(2) })
expectSuccess(t, "ReportPendingPodCount", func() error { return r.ReportPendingPodCount(1) })
metricstest.CheckLastValueData(t, "pending_pods", wantTags, 1)

expectSuccess(t, "ReportTerminatingPodCount", func() error { return r.ReportTerminatingPodCount(5) })
expectSuccess(t, "ReportTerminatingPodCount", func() error { return r.ReportTerminatingPodCount(3) })
expectSuccess(t, "ReportTerminatingPodCount", func() error { return r.ReportTerminatingPodCount(8) })
metricstest.CheckLastValueData(t, "terminating_pods", wantTags, 8)

expectSuccess(t, "ReportPanic", func() error { return r.ReportPanic(1) })
expectSuccess(t, "ReportPanic", func() error { return r.ReportPanic(0) })
expectSuccess(t, "ReportPanic", func() error { return r.ReportPanic(1) })
Expand Down Expand Up @@ -131,6 +152,9 @@ func resetMetrics() {
desiredPodCountM.Name(),
requestedPodCountM.Name(),
actualPodCountM.Name(),
notReadyPodCountM.Name(),
pendingPodCountM.Name(),
terminatingPodCountM.Name(),
stableRequestConcurrencyM.Name(),
panicRequestConcurrencyM.Name(),
excessBurstCapacityM.Name(),
Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/autoscaling/kpa/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

endpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service"
"knative.dev/serving/pkg/client/injection/ducks/autoscaling/v1alpha1/podscalable"
metricinformer "knative.dev/serving/pkg/client/injection/informers/autoscaling/v1alpha1/metric"
Expand Down Expand Up @@ -51,6 +52,7 @@ func NewController(
sksInformer := sksinformer.Get(ctx)
serviceInformer := serviceinformer.Get(ctx)
endpointsInformer := endpointsinformer.Get(ctx)
podsInformer := podinformer.Get(ctx)
metricInformer := metricinformer.Get(ctx)
psInformerFactory := podscalable.Get(ctx)

Expand All @@ -64,6 +66,7 @@ func NewController(
PSInformerFactory: psInformerFactory,
},
endpointsLister: endpointsInformer.Lister(),
podsLister: podsInformer.Lister(),
deciders: deciders,
}
impl := controller.NewImpl(c, c.Logger, "KPA-Class Autoscaling")
Expand Down
41 changes: 29 additions & 12 deletions pkg/reconciler/autoscaling/kpa/kpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
type Reconciler struct {
*areconciler.Base
endpointsLister corev1listers.EndpointsLister
podsLister corev1listers.PodLister
deciders resources.Deciders
scaler *scaler
}
Expand Down Expand Up @@ -130,7 +131,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pa *pav1alpha1.PodAutoscaler
if _, err = c.ReconcileSKS(ctx, pa, nv1alpha1.SKSOperationModeServe); err != nil {
return fmt.Errorf("error reconciling SKS: %w", err)
}
return computeStatus(pa, scaleUnknown, 0)
return computeStatus(pa, scaleUnknown, 0, 0, 0, 0)
}

pa.Status.MetricsServiceName = sks.Status.PrivateServiceName
Expand Down Expand Up @@ -177,20 +178,33 @@ func (c *Reconciler) reconcile(ctx context.Context, pa *pav1alpha1.PodAutoscaler
// Compare the desired and observed resources to determine our situation.
// We fetch private endpoints here, since for scaling we're interested in the actual
// state of the deployment.
got := 0
ready, notReady, pending, terminating := 0, 0, 0, 0

// Propagate service name.
pa.Status.ServiceName = sks.Status.ServiceName
// Currently, SKS.IsReady==True when revision has >0 ready pods.
if sks.Status.IsReady() {
podCounter := resourceutil.NewScopedEndpointsCounter(c.endpointsLister, pa.Namespace, sks.Status.PrivateServiceName)
got, err = podCounter.ReadyCount()
podEndpointCounter := resourceutil.NewScopedEndpointsCounter(c.endpointsLister, pa.Namespace, sks.Status.PrivateServiceName)
ready, err = podEndpointCounter.ReadyCount()
if err != nil {
return fmt.Errorf("error checking endpoints %s: %w", sks.Status.PrivateServiceName, err)
}

notReady, err = podEndpointCounter.NotReadyCount()
if err != nil {
return fmt.Errorf("error checking endpoints %s: %w", sks.Status.PrivateServiceName, err)
}
}
logger.Infof("PA scale got=%d, want=%d, ebc=%d", got, want, decider.Status.ExcessBurstCapacity)
return computeStatus(pa, want, got)

podCounter := resourceutil.NewScopedPodsCounter(c.podsLister, pa.Namespace, sks.Status.ServiceName)
err = podCounter.Get()
if err != nil {
return fmt.Errorf("error checking pods %s: %w", sks.Status.PrivateServiceName, err)
}
pending, terminating = podCounter.PendingCount(), podCounter.TerminatingCount()

logger.Infof("PA scale got=%d, want=%d, ebc=%d", ready, want, decider.Status.ExcessBurstCapacity)
return computeStatus(pa, want, ready, notReady, pending, terminating)
}

func (c *Reconciler) reconcileDecider(ctx context.Context, pa *pav1alpha1.PodAutoscaler, k8sSvc string) (*autoscaler.Decider, error) {
Expand All @@ -217,20 +231,20 @@ func (c *Reconciler) reconcileDecider(ctx context.Context, pa *pav1alpha1.PodAut
return decider, nil
}

func computeStatus(pa *pav1alpha1.PodAutoscaler, want int32, got int) error {
pa.Status.DesiredScale, pa.Status.ActualScale = &want, ptr.Int32(int32(got))
func computeStatus(pa *pav1alpha1.PodAutoscaler, want int32, ready int, notReady int, pending int, terminating int) error {
pa.Status.DesiredScale, pa.Status.ActualScale = &want, ptr.Int32(int32(ready))

if err := reportMetrics(pa, want, got); err != nil {
if err := reportMetrics(pa, want, ready, notReady, pending, terminating); err != nil {
return fmt.Errorf("error reporting metrics: %w", err)
}

computeActiveCondition(pa, want, got)
computeActiveCondition(pa, want, ready)

pa.Status.ObservedGeneration = pa.Generation
return nil
}

func reportMetrics(pa *pav1alpha1.PodAutoscaler, want int32, got int) error {
func reportMetrics(pa *pav1alpha1.PodAutoscaler, want int32, ready, notReady, pending, terminating int) error {
var serviceLabel string
var configLabel string
if pa.Labels != nil {
Expand All @@ -242,7 +256,10 @@ func reportMetrics(pa *pav1alpha1.PodAutoscaler, want int32, got int) error {
return err
}

reporter.ReportActualPodCount(int64(got))
reporter.ReportActualPodCount(int64(ready))
reporter.ReportNotReadyPodCount(int64(notReady))
reporter.ReportPendingPodCount(int64(pending))
reporter.ReportTerminatingPodCount(int64(terminating))
// Negative "want" values represent an empty metrics pipeline and thus no specific request is being made.
if want >= 0 {
reporter.ReportRequestedPodCount(int64(want))
Expand Down
Loading

0 comments on commit 627e676

Please sign in to comment.