From 08a42bd85ba3384da7d1661c689cd339121afb98 Mon Sep 17 00:00:00 2001 From: Jerad C Date: Wed, 15 Sep 2021 11:36:35 -0500 Subject: [PATCH 1/3] add allocation scheduling duration metric --- pkg/controllers/allocation/bind.go | 20 ++-- pkg/controllers/allocation/controller.go | 4 +- .../allocation/scheduling/scheduler.go | 94 +++++++++++++++---- pkg/metrics/constants.go | 33 +++++++ 4 files changed, 121 insertions(+), 30 deletions(-) create mode 100644 pkg/metrics/constants.go diff --git a/pkg/controllers/allocation/bind.go b/pkg/controllers/allocation/bind.go index 1d0ba31873c3..bd6b614f712e 100644 --- a/pkg/controllers/allocation/bind.go +++ b/pkg/controllers/allocation/bind.go @@ -20,6 +20,7 @@ import ( "time" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha3" + "github.com/awslabs/karpenter/pkg/metrics" "github.com/prometheus/client_golang/prometheus" "go.uber.org/multierr" v1 "k8s.io/api/core/v1" @@ -29,7 +30,7 @@ import ( "k8s.io/client-go/util/workqueue" "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/metrics" + crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" ) type Binder interface { @@ -99,23 +100,18 @@ type binderMetricsDecorator struct { bindTimeHistogramVec *prometheus.HistogramVec } -const metricLabelResult = "result" - func DecorateBinderMetrics(binder Binder) Binder { bindTimeHistogramVec := prometheus.NewHistogramVec( prometheus.HistogramOpts{ - Namespace: "karpenter", + Namespace: metrics.KarpenterNamespace, Subsystem: "allocation_controller", Name: "bind_duration_seconds", Help: "Duration of bind process in seconds. Broken down by result.", - // Use same bucket thresholds as controller-runtime. - // https://github.com/kubernetes-sigs/controller-runtime/blob/v0.10.0/pkg/internal/controller/metrics/metrics.go#L47-L48 - Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, - 1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 40, 50, 60}, + Buckets: metrics.DurationBuckets(), }, - []string{metricLabelResult}, + []string{metrics.ResultLabel}, ) - metrics.Registry.MustRegister(bindTimeHistogramVec) + crmetrics.Registry.MustRegister(bindTimeHistogramVec) return &binderMetricsDecorator{binder: binder, bindTimeHistogramVec: bindTimeHistogramVec} } @@ -130,11 +126,11 @@ func (b *binderMetricsDecorator) Bind(ctx context.Context, node *v1.Node, pods [ result = "error" } - observer, promErr := b.bindTimeHistogramVec.GetMetricWith(prometheus.Labels{metricLabelResult: result}) + observer, promErr := b.bindTimeHistogramVec.GetMetricWith(prometheus.Labels{metrics.ResultLabel: result}) if promErr != nil { logging.FromContext(ctx).Warnf( "Failed to record bind duration metric [%s=%s, duration=%f]: error=%w", - metricLabelResult, + metrics.ResultLabel, result, durationSeconds, promErr, diff --git a/pkg/controllers/allocation/controller.go b/pkg/controllers/allocation/controller.go index ba831e2f1b66..1527390ddfaa 100644 --- a/pkg/controllers/allocation/controller.go +++ b/pkg/controllers/allocation/controller.go @@ -56,7 +56,7 @@ type Controller struct { Batcher *Batcher Filter *Filter Binder Binder - Scheduler *scheduling.Scheduler + Scheduler scheduling.Scheduler Packer binpacking.Packer CloudProvider cloudprovider.CloudProvider KubeClient client.Client @@ -68,7 +68,7 @@ func NewController(kubeClient client.Client, coreV1Client corev1.CoreV1Interface Filter: &Filter{KubeClient: kubeClient}, Binder: DecorateBinderMetrics(NewBinder(kubeClient, coreV1Client)), Batcher: NewBatcher(maxBatchWindow, batchIdleTimeout), - Scheduler: scheduling.NewScheduler(cloudProvider, kubeClient), + Scheduler: scheduling.DecorateSchedulerMetrics(scheduling.NewScheduler(cloudProvider, kubeClient)), Packer: binpacking.NewPacker(), CloudProvider: cloudProvider, KubeClient: kubeClient, diff --git a/pkg/controllers/allocation/scheduling/scheduler.go b/pkg/controllers/allocation/scheduling/scheduler.go index 58780969973b..ea924f04b529 100644 --- a/pkg/controllers/allocation/scheduling/scheduler.go +++ b/pkg/controllers/allocation/scheduling/scheduler.go @@ -17,30 +17,29 @@ package scheduling import ( "context" "fmt" + "time" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha3" "github.com/awslabs/karpenter/pkg/cloudprovider" + "github.com/awslabs/karpenter/pkg/metrics" "github.com/mitchellh/hashstructure/v2" + "github.com/prometheus/client_golang/prometheus" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" + crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" ) -func NewScheduler(cloudProvider cloudprovider.CloudProvider, kubeClient client.Client) *Scheduler { - return &Scheduler{ - KubeClient: kubeClient, - Topology: &Topology{ - cloudProvider: cloudProvider, - kubeClient: kubeClient, - }, - } +type Scheduler interface { + Solve(context.Context, *v1alpha3.Provisioner, []*v1.Pod) ([]*Schedule, error) } -type Scheduler struct { - KubeClient client.Client - Topology *Topology +type scheduler struct { + kubeClient client.Client + topology *Topology } type Schedule struct { @@ -51,12 +50,22 @@ type Schedule struct { Daemons []*v1.Pod } -func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) { +func NewScheduler(cloudProvider cloudprovider.CloudProvider, kubeClient client.Client) Scheduler { + return &scheduler{ + kubeClient: kubeClient, + topology: &Topology{ + cloudProvider: cloudProvider, + kubeClient: kubeClient, + }, + } +} + +func (s *scheduler) Solve(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) { // 1. Inject temporarily adds specific NodeSelectors to pods, which are then // used by scheduling logic. This isn't strictly necessary, but is a useful // trick to avoid passing topology decisions through the scheduling code. It // lets us to treat TopologySpreadConstraints as just-in-time NodeSelectors. - if err := s.Topology.Inject(ctx, provisioner, pods); err != nil { + if err := s.topology.Inject(ctx, provisioner, pods); err != nil { return nil, fmt.Errorf("injecting topology, %w", err) } @@ -79,7 +88,7 @@ func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha3.Provisioner // getSchedules separates pods into a set of schedules. All pods in each group // contain compatible scheduling constarints and can be deployed together on the // same node, or multiple similar nodes if the pods exceed one node's capacity. -func (s *Scheduler) getSchedules(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) { +func (s *scheduler) getSchedules(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) { // schedule uniqueness is tracked by hash(Constraints) schedules := map[uint64]*Schedule{} for _, pod := range pods { @@ -116,10 +125,10 @@ func (s *Scheduler) getSchedules(ctx context.Context, provisioner *v1alpha3.Prov return result, nil } -func (s *Scheduler) getDaemons(ctx context.Context, node *v1.Node) ([]*v1.Pod, error) { +func (s *scheduler) getDaemons(ctx context.Context, node *v1.Node) ([]*v1.Pod, error) { // 1. Get DaemonSets daemonSetList := &appsv1.DaemonSetList{} - if err := s.KubeClient.List(ctx, daemonSetList); err != nil { + if err := s.kubeClient.List(ctx, daemonSetList); err != nil { return nil, fmt.Errorf("listing daemonsets, %w", err) } @@ -147,3 +156,56 @@ func IsSchedulable(pod *v1.Pod, node *v1.Node) bool { // TODO, support node affinity return true } + +type schedulerMetricsDecorator struct { + scheduler Scheduler + scheduleTimeHistogramVec *prometheus.HistogramVec +} + +func DecorateSchedulerMetrics(scheduler Scheduler) Scheduler { + scheduleTimeHistogramVec := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: metrics.KarpenterNamespace, + Subsystem: "allocation_controller", + Name: "scheduling_duration_seconds", + Help: "Duration of scheduling process in seconds. Broken down by provisioner and result.", + Buckets: metrics.DurationBuckets(), + }, + []string{metrics.ProvisionerLabel, metrics.ResultLabel}, + ) + crmetrics.Registry.MustRegister(scheduleTimeHistogramVec) + + return &schedulerMetricsDecorator{scheduler: scheduler, scheduleTimeHistogramVec: scheduleTimeHistogramVec} +} + +func (s *schedulerMetricsDecorator) Solve(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) { + startTime := time.Now() + schedules, scheduleErr := s.scheduler.Solve(ctx, provisioner, pods) + durationSeconds := time.Since(startTime).Seconds() + + result := "success" + if scheduleErr != nil { + result = "error" + } + + provisionerName := provisioner.ObjectMeta.Name + observer, promErr := s.scheduleTimeHistogramVec.GetMetricWith(prometheus.Labels{ + metrics.ProvisionerLabel: provisionerName, + metrics.ResultLabel: result, + }) + if promErr != nil { + logging.FromContext(ctx).Warnf( + "Failed to record scheduling duration metric [%s=%s, %s=%s, duration=%f]: error=%w", + metrics.ProvisionerLabel, + provisionerName, + metrics.ResultLabel, + result, + durationSeconds, + promErr, + ) + } else { + observer.Observe(durationSeconds) + } + + return schedules, scheduleErr +} diff --git a/pkg/metrics/constants.go b/pkg/metrics/constants.go new file mode 100644 index 000000000000..1b148e6f2ab7 --- /dev/null +++ b/pkg/metrics/constants.go @@ -0,0 +1,33 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +const ( + // Common namespace for application metrics. + KarpenterNamespace = "karpenter" + + // Common set of metric label names. + ResultLabel = "result" + ProvisionerLabel = "provisioner" +) + +// DurationBuckets returns a []float64 of default threshold values for duration histograms. +// Each returned slice is new and may be modified without impacting other bucket definitions. +func DurationBuckets() []float64 { + // Use same bucket thresholds as controller-runtime. + // https://github.com/kubernetes-sigs/controller-runtime/blob/v0.10.0/pkg/internal/controller/metrics/metrics.go#L47-L48 + return []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, + 1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 40, 50, 60} +} From 5123d9180887b4794e93f28d3ad2429ec154809d Mon Sep 17 00:00:00 2001 From: Jerad C Date: Thu, 16 Sep 2021 13:43:07 -0500 Subject: [PATCH 2/3] remove decorator pattern --- pkg/controllers/allocation/controller.go | 4 +- .../allocation/scheduling/scheduler.go | 124 ++++++++---------- 2 files changed, 59 insertions(+), 69 deletions(-) diff --git a/pkg/controllers/allocation/controller.go b/pkg/controllers/allocation/controller.go index 1527390ddfaa..ba831e2f1b66 100644 --- a/pkg/controllers/allocation/controller.go +++ b/pkg/controllers/allocation/controller.go @@ -56,7 +56,7 @@ type Controller struct { Batcher *Batcher Filter *Filter Binder Binder - Scheduler scheduling.Scheduler + Scheduler *scheduling.Scheduler Packer binpacking.Packer CloudProvider cloudprovider.CloudProvider KubeClient client.Client @@ -68,7 +68,7 @@ func NewController(kubeClient client.Client, coreV1Client corev1.CoreV1Interface Filter: &Filter{KubeClient: kubeClient}, Binder: DecorateBinderMetrics(NewBinder(kubeClient, coreV1Client)), Batcher: NewBatcher(maxBatchWindow, batchIdleTimeout), - Scheduler: scheduling.DecorateSchedulerMetrics(scheduling.NewScheduler(cloudProvider, kubeClient)), + Scheduler: scheduling.NewScheduler(cloudProvider, kubeClient), Packer: binpacking.NewPacker(), CloudProvider: cloudProvider, KubeClient: kubeClient, diff --git a/pkg/controllers/allocation/scheduling/scheduler.go b/pkg/controllers/allocation/scheduling/scheduler.go index ea924f04b529..4344e8ce7ec6 100644 --- a/pkg/controllers/allocation/scheduling/scheduler.go +++ b/pkg/controllers/allocation/scheduling/scheduler.go @@ -33,13 +33,24 @@ import ( crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" ) -type Scheduler interface { - Solve(context.Context, *v1alpha3.Provisioner, []*v1.Pod) ([]*Schedule, error) +var scheduleTimeHistogramVec = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: metrics.KarpenterNamespace, + Subsystem: "allocation_controller", + Name: "scheduling_duration_seconds", + Help: "Duration of scheduling process in seconds. Broken down by provisioner and result.", + Buckets: metrics.DurationBuckets(), + }, + []string{metrics.ProvisionerLabel, metrics.ResultLabel}, +) + +func init() { + crmetrics.Registry.MustRegister(scheduleTimeHistogramVec) } -type scheduler struct { - kubeClient client.Client - topology *Topology +type Scheduler struct { + KubeClient client.Client + Topology *Topology } type Schedule struct { @@ -50,22 +61,54 @@ type Schedule struct { Daemons []*v1.Pod } -func NewScheduler(cloudProvider cloudprovider.CloudProvider, kubeClient client.Client) Scheduler { - return &scheduler{ - kubeClient: kubeClient, - topology: &Topology{ +func NewScheduler(cloudProvider cloudprovider.CloudProvider, kubeClient client.Client) *Scheduler { + return &Scheduler{ + KubeClient: kubeClient, + Topology: &Topology{ cloudProvider: cloudProvider, kubeClient: kubeClient, }, } } -func (s *scheduler) Solve(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) { +func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) { + startTime := time.Now() + schedules, scheduleErr := s.solve(ctx, provisioner, pods) + durationSeconds := time.Since(startTime).Seconds() + + result := "success" + if scheduleErr != nil { + result = "error" + } + + provisionerName := provisioner.ObjectMeta.Name + observer, promErr := scheduleTimeHistogramVec.GetMetricWith(prometheus.Labels{ + metrics.ProvisionerLabel: provisionerName, + metrics.ResultLabel: result, + }) + if promErr != nil { + logging.FromContext(ctx).Warnf( + "Failed to record scheduling duration metric [%s=%s, %s=%s, duration=%f]: error=%w", + metrics.ProvisionerLabel, + provisionerName, + metrics.ResultLabel, + result, + durationSeconds, + promErr, + ) + } else { + observer.Observe(durationSeconds) + } + + return schedules, scheduleErr +} + +func (s *Scheduler) solve(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) { // 1. Inject temporarily adds specific NodeSelectors to pods, which are then // used by scheduling logic. This isn't strictly necessary, but is a useful // trick to avoid passing topology decisions through the scheduling code. It // lets us to treat TopologySpreadConstraints as just-in-time NodeSelectors. - if err := s.topology.Inject(ctx, provisioner, pods); err != nil { + if err := s.Topology.Inject(ctx, provisioner, pods); err != nil { return nil, fmt.Errorf("injecting topology, %w", err) } @@ -88,7 +131,7 @@ func (s *scheduler) Solve(ctx context.Context, provisioner *v1alpha3.Provisioner // getSchedules separates pods into a set of schedules. All pods in each group // contain compatible scheduling constarints and can be deployed together on the // same node, or multiple similar nodes if the pods exceed one node's capacity. -func (s *scheduler) getSchedules(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) { +func (s *Scheduler) getSchedules(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) { // schedule uniqueness is tracked by hash(Constraints) schedules := map[uint64]*Schedule{} for _, pod := range pods { @@ -125,10 +168,10 @@ func (s *scheduler) getSchedules(ctx context.Context, provisioner *v1alpha3.Prov return result, nil } -func (s *scheduler) getDaemons(ctx context.Context, node *v1.Node) ([]*v1.Pod, error) { +func (s *Scheduler) getDaemons(ctx context.Context, node *v1.Node) ([]*v1.Pod, error) { // 1. Get DaemonSets daemonSetList := &appsv1.DaemonSetList{} - if err := s.kubeClient.List(ctx, daemonSetList); err != nil { + if err := s.KubeClient.List(ctx, daemonSetList); err != nil { return nil, fmt.Errorf("listing daemonsets, %w", err) } @@ -156,56 +199,3 @@ func IsSchedulable(pod *v1.Pod, node *v1.Node) bool { // TODO, support node affinity return true } - -type schedulerMetricsDecorator struct { - scheduler Scheduler - scheduleTimeHistogramVec *prometheus.HistogramVec -} - -func DecorateSchedulerMetrics(scheduler Scheduler) Scheduler { - scheduleTimeHistogramVec := prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: metrics.KarpenterNamespace, - Subsystem: "allocation_controller", - Name: "scheduling_duration_seconds", - Help: "Duration of scheduling process in seconds. Broken down by provisioner and result.", - Buckets: metrics.DurationBuckets(), - }, - []string{metrics.ProvisionerLabel, metrics.ResultLabel}, - ) - crmetrics.Registry.MustRegister(scheduleTimeHistogramVec) - - return &schedulerMetricsDecorator{scheduler: scheduler, scheduleTimeHistogramVec: scheduleTimeHistogramVec} -} - -func (s *schedulerMetricsDecorator) Solve(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) { - startTime := time.Now() - schedules, scheduleErr := s.scheduler.Solve(ctx, provisioner, pods) - durationSeconds := time.Since(startTime).Seconds() - - result := "success" - if scheduleErr != nil { - result = "error" - } - - provisionerName := provisioner.ObjectMeta.Name - observer, promErr := s.scheduleTimeHistogramVec.GetMetricWith(prometheus.Labels{ - metrics.ProvisionerLabel: provisionerName, - metrics.ResultLabel: result, - }) - if promErr != nil { - logging.FromContext(ctx).Warnf( - "Failed to record scheduling duration metric [%s=%s, %s=%s, duration=%f]: error=%w", - metrics.ProvisionerLabel, - provisionerName, - metrics.ResultLabel, - result, - durationSeconds, - promErr, - ) - } else { - observer.Observe(durationSeconds) - } - - return schedules, scheduleErr -} From a9d69d26b6845fd6fb0b2e1f683ac8f83223d757 Mon Sep 17 00:00:00 2001 From: Jerad C Date: Thu, 16 Sep 2021 15:20:27 -0500 Subject: [PATCH 3/3] include metric labels in log message * Fix error format specifier in log message --- .../allocation/scheduling/scheduler.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/pkg/controllers/allocation/scheduling/scheduler.go b/pkg/controllers/allocation/scheduling/scheduler.go index 4344e8ce7ec6..311cf79d55cb 100644 --- a/pkg/controllers/allocation/scheduling/scheduler.go +++ b/pkg/controllers/allocation/scheduling/scheduler.go @@ -81,20 +81,17 @@ func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha3.Provisioner result = "error" } - provisionerName := provisioner.ObjectMeta.Name - observer, promErr := scheduleTimeHistogramVec.GetMetricWith(prometheus.Labels{ - metrics.ProvisionerLabel: provisionerName, + labels := prometheus.Labels{ + metrics.ProvisionerLabel: provisioner.ObjectMeta.Name, metrics.ResultLabel: result, - }) + } + observer, promErr := scheduleTimeHistogramVec.GetMetricWith(labels) if promErr != nil { logging.FromContext(ctx).Warnf( - "Failed to record scheduling duration metric [%s=%s, %s=%s, duration=%f]: error=%w", - metrics.ProvisionerLabel, - provisionerName, - metrics.ResultLabel, - result, + "Failed to record scheduling duration metric [labels=%s, duration=%f]: error=%s", + labels, durationSeconds, - promErr, + promErr.Error(), ) } else { observer.Observe(durationSeconds)