Skip to content

Commit

Permalink
add podsLister to kpa and report pod state metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
nimakaviani committed Jan 15, 2020
1 parent d4c2097 commit b8f1101
Show file tree
Hide file tree
Showing 15 changed files with 561 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,30 @@ data:
"intervalFactor": 1,
"legendFormat": "Requested Pods",
"refId": "C"
},
{
"expr": "sum(autoscaler_pending_pods{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})",
"format": "time_series",
"interval": "1s",
"intervalFactor": 1,
"legendFormat": "Pending Pods",
"refId": "P"
},
{
"expr": "sum(autoscaler_not_ready_pods{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})",
"format": "time_series",
"interval": "1s",
"intervalFactor": 1,
"legendFormat": "NotReady Pods",
"refId": "N"
},
{
"expr": "sum(autoscaler_terminating_pods{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})",
"format": "time_series",
"interval": "1s",
"intervalFactor": 1,
"legendFormat": "Terminating Pods",
"refId": "T"
}
],
"thresholds": [],
Expand Down Expand Up @@ -172,7 +196,88 @@ data:
"show": false
}
]
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "prometheus",
"fill": 1,
"gridPos": {
"h": 9,
"w": 24,
"x": 0,
"y": 2
},
"id": 3,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": true,
"targets": [
{
"expr": "sum(activator_request_concurrency{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})",
"format": "time_series",
"interval": "1s",
"intervalFactor": 1,
"legendFormat": "Request Concurrency",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
"title": "Request Concurrency",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": "",
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": "",
"logBase": 1,
"max": null,
"min": null,
"show": false
}
]
}
],
"title": "Revision Pod Counts",
"type": "row"
Expand Down
4 changes: 2 additions & 2 deletions pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Autoscaler struct {
// specMux guards the current DeciderSpec and the PodCounter.
specMux sync.RWMutex
deciderSpec *DeciderSpec
podCounter resources.ReadyPodCounter
podCounter resources.EndpointsCounter
}

// New creates a new instance of autoscaler
Expand Down Expand Up @@ -246,7 +246,7 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (desiredPodCount
return desiredPodCount, excessBC, true
}

func (a *Autoscaler) currentSpecAndPC() (*DeciderSpec, resources.ReadyPodCounter) {
func (a *Autoscaler) currentSpecAndPC() (*DeciderSpec, resources.EndpointsCounter) {
a.specMux.RLock()
defer a.specMux.RUnlock()
return a.deciderSpec, a.podCounter
Expand Down
19 changes: 17 additions & 2 deletions pkg/autoscaler/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ var (
kubeInformer = kubeinformers.NewSharedInformerFactory(kubeClient, 0)
)

func TestNewErrorWhenGivenNilReadyPodCounter(t *testing.T) {
func TestNewErrorWhenGivenNilEndpointsCounter(t *testing.T) {
_, err := New(testNamespace, testRevision, &autoscalerfake.MetricClient{}, nil, &DeciderSpec{TargetValue: 10, ServiceName: testService}, &mockReporter{})
if err == nil {
t.Error("Expected error when ReadyPodCounter interface is nil, but got none.")
t.Error("Expected error when EndpointsCounter interface is nil, but got none.")
}
}

Expand Down Expand Up @@ -335,6 +335,21 @@ func (r *mockReporter) ReportActualPodCount(v int64) error {
return nil
}

// ReportNotReadyPodCount of a mockReporter does nothing and return nil for error.
func (r *mockReporter) ReportNotReadyPodCount(v int64) error {
return nil
}

// ReportPendingPodCount of a mockReporter does nothing and return nil for error.
func (r *mockReporter) ReportPendingPodCount(v int64) error {
return nil
}

// ReportTerminatingPodCount of a mockReporter does nothing and return nil for error.
func (r *mockReporter) ReportTerminatingPodCount(v int64) error {
return nil
}

// ReportStableRequestConcurrency of a mockReporter does nothing and return nil for error.
func (r *mockReporter) ReportStableRequestConcurrency(v float64) error {
return nil
Expand Down
48 changes: 48 additions & 0 deletions pkg/autoscaler/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ var (
"actual_pods",
"Number of pods that are allocated currently",
stats.UnitDimensionless)
notReadyPodCountM = stats.Int64(
"not_ready_pods",
"Number of pods that are not ready currently",
stats.UnitDimensionless)
pendingPodCountM = stats.Int64(
"pending_pods",
"Number of pods that are pending currently",
stats.UnitDimensionless)
terminatingPodCountM = stats.Int64(
"terminating_pods",
"Number of pods that are terminating currently",
stats.UnitDimensionless)
excessBurstCapacityM = stats.Float64(
"excess_burst_capacity",
"Excess burst capacity overserved over the stable window",
Expand Down Expand Up @@ -103,6 +115,24 @@ func register() {
Aggregation: view.LastValue(),
TagKeys: metrics.CommonRevisionKeys,
},
&view.View{
Description: "Number of pods that are not ready currently",
Measure: notReadyPodCountM,
Aggregation: view.LastValue(),
TagKeys: metrics.CommonRevisionKeys,
},
&view.View{
Description: "Number of pods that are pending currently",
Measure: pendingPodCountM,
Aggregation: view.LastValue(),
TagKeys: metrics.CommonRevisionKeys,
},
&view.View{
Description: "Number of pods that are terminating currently",
Measure: terminatingPodCountM,
Aggregation: view.LastValue(),
TagKeys: metrics.CommonRevisionKeys,
},
&view.View{
Description: "Average of requests count over the stable window",
Measure: stableRequestConcurrencyM,
Expand Down Expand Up @@ -161,6 +191,9 @@ type StatsReporter interface {
ReportDesiredPodCount(v int64) error
ReportRequestedPodCount(v int64) error
ReportActualPodCount(v int64) error
ReportNotReadyPodCount(v int64) error
ReportTerminatingPodCount(v int64) error
ReportPendingPodCount(v int64) error
ReportStableRequestConcurrency(v float64) error
ReportPanicRequestConcurrency(v float64) error
ReportTargetRequestConcurrency(v float64) error
Expand Down Expand Up @@ -221,6 +254,21 @@ func (r *Reporter) ReportActualPodCount(v int64) error {
return r.report(actualPodCountM.M(v))
}

// ReportNotReadyPodCount captures value v for not ready pod count measure.
func (r *Reporter) ReportNotReadyPodCount(v int64) error {
return r.report(notReadyPodCountM.M(v))
}

// ReportPendingPodCount captures value v for pending pod count measure.
func (r *Reporter) ReportPendingPodCount(v int64) error {
return r.report(pendingPodCountM.M(v))
}

// ReportTerminatingPodCount captures value v for terminating pod count measure.
func (r *Reporter) ReportTerminatingPodCount(v int64) error {
return r.report(terminatingPodCountM.M(v))
}

// ReportExcessBurstCapacity captures value v for excess target burst capacity.
func (r *Reporter) ReportExcessBurstCapacity(v float64) error {
return r.report(excessBurstCapacityM.M(v))
Expand Down
24 changes: 24 additions & 0 deletions pkg/autoscaler/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func TestReporterReport(t *testing.T) {
expectSuccess(t, "ReportDesiredPodCount", func() error { return r.ReportDesiredPodCount(10) })
expectSuccess(t, "ReportRequestedPodCount", func() error { return r.ReportRequestedPodCount(7) })
expectSuccess(t, "ReportActualPodCount", func() error { return r.ReportActualPodCount(5) })
expectSuccess(t, "ReportNotReadyPodCount", func() error { return r.ReportNotReadyPodCount(9) })
expectSuccess(t, "ReportPendingPodCount", func() error { return r.ReportPendingPodCount(6) })
expectSuccess(t, "ReportTerminatingPodCount", func() error { return r.ReportTerminatingPodCount(8) })
expectSuccess(t, "ReportPanic", func() error { return r.ReportPanic(0) })
expectSuccess(t, "ReportStableRequestConcurrency", func() error { return r.ReportStableRequestConcurrency(2) })
expectSuccess(t, "ReportPanicRequestConcurrency", func() error { return r.ReportPanicRequestConcurrency(3) })
Expand All @@ -69,6 +72,9 @@ func TestReporterReport(t *testing.T) {
metricstest.CheckLastValueData(t, "desired_pods", wantTags, 10)
metricstest.CheckLastValueData(t, "requested_pods", wantTags, 7)
metricstest.CheckLastValueData(t, "actual_pods", wantTags, 5)
metricstest.CheckLastValueData(t, "not_ready_pods", wantTags, 9)
metricstest.CheckLastValueData(t, "pending_pods", wantTags, 6)
metricstest.CheckLastValueData(t, "terminating_pods", wantTags, 8)
metricstest.CheckLastValueData(t, "panic_mode", wantTags, 0)
metricstest.CheckLastValueData(t, "stable_request_concurrency", wantTags, 2)
metricstest.CheckLastValueData(t, "excess_burst_capacity", wantTags, 19.84)
Expand All @@ -94,6 +100,21 @@ func TestReporterReport(t *testing.T) {
expectSuccess(t, "ReportActualPodCount", func() error { return r.ReportActualPodCount(9) })
metricstest.CheckLastValueData(t, "actual_pods", wantTags, 9)

expectSuccess(t, "ReportNotReadyPodCount", func() error { return r.ReportNotReadyPodCount(6) })
expectSuccess(t, "ReportNotReadyPodCount", func() error { return r.ReportNotReadyPodCount(5) })
expectSuccess(t, "ReportNotReadyPodCount", func() error { return r.ReportNotReadyPodCount(4) })
metricstest.CheckLastValueData(t, "not_ready_pods", wantTags, 4)

expectSuccess(t, "ReportPendingPodCount", func() error { return r.ReportPendingPodCount(3) })
expectSuccess(t, "ReportPendingPodCount", func() error { return r.ReportPendingPodCount(2) })
expectSuccess(t, "ReportPendingPodCount", func() error { return r.ReportPendingPodCount(1) })
metricstest.CheckLastValueData(t, "pending_pods", wantTags, 1)

expectSuccess(t, "ReportTerminatingPodCount", func() error { return r.ReportTerminatingPodCount(5) })
expectSuccess(t, "ReportTerminatingPodCount", func() error { return r.ReportTerminatingPodCount(3) })
expectSuccess(t, "ReportTerminatingPodCount", func() error { return r.ReportTerminatingPodCount(8) })
metricstest.CheckLastValueData(t, "terminating_pods", wantTags, 8)

expectSuccess(t, "ReportPanic", func() error { return r.ReportPanic(1) })
expectSuccess(t, "ReportPanic", func() error { return r.ReportPanic(0) })
expectSuccess(t, "ReportPanic", func() error { return r.ReportPanic(1) })
Expand Down Expand Up @@ -131,6 +152,9 @@ func resetMetrics() {
desiredPodCountM.Name(),
requestedPodCountM.Name(),
actualPodCountM.Name(),
notReadyPodCountM.Name(),
pendingPodCountM.Name(),
terminatingPodCountM.Name(),
stableRequestConcurrencyM.Name(),
panicRequestConcurrencyM.Name(),
excessBurstCapacityM.Name(),
Expand Down
6 changes: 3 additions & 3 deletions pkg/autoscaler/stats_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ var cacheDisabledClient = &http.Client{
// for details.
type ServiceScraper struct {
sClient scrapeClient
counter resources.ReadyPodCounter
counter resources.EndpointsCounter
url string
}

// NewServiceScraper creates a new StatsScraper for the Revision which
// the given Metric is responsible for.
func NewServiceScraper(metric *av1alpha1.Metric, counter resources.ReadyPodCounter) (*ServiceScraper, error) {
func NewServiceScraper(metric *av1alpha1.Metric, counter resources.EndpointsCounter) (*ServiceScraper, error) {
sClient, err := newHTTPScrapeClient(cacheDisabledClient)
if err != nil {
return nil, err
Expand All @@ -102,7 +102,7 @@ func NewServiceScraper(metric *av1alpha1.Metric, counter resources.ReadyPodCount

func newServiceScraperWithClient(
metric *av1alpha1.Metric,
counter resources.ReadyPodCounter,
counter resources.EndpointsCounter,
sClient scrapeClient) (*ServiceScraper, error) {
if metric == nil {
return nil, errors.New("metric must not be nil")
Expand Down
2 changes: 1 addition & 1 deletion pkg/autoscaler/stats_scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestNewServiceScraperWithClientErrorCases(t *testing.T) {
name string
metric *av1alpha1.Metric
client scrapeClient
counter resources.ReadyPodCounter
counter resources.EndpointsCounter
expectedErr string
}{{
name: "Empty Decider",
Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/autoscaling/kpa/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

endpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service"
"knative.dev/serving/pkg/client/injection/ducks/autoscaling/v1alpha1/podscalable"
metricinformer "knative.dev/serving/pkg/client/injection/informers/autoscaling/v1alpha1/metric"
Expand Down Expand Up @@ -51,6 +52,7 @@ func NewController(
sksInformer := sksinformer.Get(ctx)
serviceInformer := serviceinformer.Get(ctx)
endpointsInformer := endpointsinformer.Get(ctx)
podsInformer := podinformer.Get(ctx)
metricInformer := metricinformer.Get(ctx)
psInformerFactory := podscalable.Get(ctx)

Expand All @@ -64,6 +66,7 @@ func NewController(
PSInformerFactory: psInformerFactory,
},
endpointsLister: endpointsInformer.Lister(),
podsLister: podsInformer.Lister(),
deciders: deciders,
}
impl := controller.NewImpl(c, c.Logger, "KPA-Class Autoscaling")
Expand Down
Loading

0 comments on commit b8f1101

Please sign in to comment.