Skip to content

Commit

Permalink
add allocation scheduling duration metric
Browse files Browse the repository at this point in the history
  • Loading branch information
cjerad committed Sep 16, 2021
1 parent 53bc263 commit 08a42bd
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 30 deletions.
20 changes: 8 additions & 12 deletions pkg/controllers/allocation/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha3"
"github.com/awslabs/karpenter/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
Expand All @@ -29,7 +30,7 @@ import (
"k8s.io/client-go/util/workqueue"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/metrics"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
)

type Binder interface {
Expand Down Expand Up @@ -99,23 +100,18 @@ type binderMetricsDecorator struct {
bindTimeHistogramVec *prometheus.HistogramVec
}

const metricLabelResult = "result"

func DecorateBinderMetrics(binder Binder) Binder {
bindTimeHistogramVec := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "karpenter",
Namespace: metrics.KarpenterNamespace,
Subsystem: "allocation_controller",
Name: "bind_duration_seconds",
Help: "Duration of bind process in seconds. Broken down by result.",
// Use same bucket thresholds as controller-runtime.
// https://github.com/kubernetes-sigs/controller-runtime/blob/v0.10.0/pkg/internal/controller/metrics/metrics.go#L47-L48
Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0,
1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 40, 50, 60},
Buckets: metrics.DurationBuckets(),
},
[]string{metricLabelResult},
[]string{metrics.ResultLabel},
)
metrics.Registry.MustRegister(bindTimeHistogramVec)
crmetrics.Registry.MustRegister(bindTimeHistogramVec)

return &binderMetricsDecorator{binder: binder, bindTimeHistogramVec: bindTimeHistogramVec}
}
Expand All @@ -130,11 +126,11 @@ func (b *binderMetricsDecorator) Bind(ctx context.Context, node *v1.Node, pods [
result = "error"
}

observer, promErr := b.bindTimeHistogramVec.GetMetricWith(prometheus.Labels{metricLabelResult: result})
observer, promErr := b.bindTimeHistogramVec.GetMetricWith(prometheus.Labels{metrics.ResultLabel: result})
if promErr != nil {
logging.FromContext(ctx).Warnf(
"Failed to record bind duration metric [%s=%s, duration=%f]: error=%w",
metricLabelResult,
metrics.ResultLabel,
result,
durationSeconds,
promErr,
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/allocation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Controller struct {
Batcher *Batcher
Filter *Filter
Binder Binder
Scheduler *scheduling.Scheduler
Scheduler scheduling.Scheduler
Packer binpacking.Packer
CloudProvider cloudprovider.CloudProvider
KubeClient client.Client
Expand All @@ -68,7 +68,7 @@ func NewController(kubeClient client.Client, coreV1Client corev1.CoreV1Interface
Filter: &Filter{KubeClient: kubeClient},
Binder: DecorateBinderMetrics(NewBinder(kubeClient, coreV1Client)),
Batcher: NewBatcher(maxBatchWindow, batchIdleTimeout),
Scheduler: scheduling.NewScheduler(cloudProvider, kubeClient),
Scheduler: scheduling.DecorateSchedulerMetrics(scheduling.NewScheduler(cloudProvider, kubeClient)),
Packer: binpacking.NewPacker(),
CloudProvider: cloudProvider,
KubeClient: kubeClient,
Expand Down
94 changes: 78 additions & 16 deletions pkg/controllers/allocation/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,29 @@ package scheduling
import (
"context"
"fmt"
"time"

"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha3"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/metrics"
"github.com/mitchellh/hashstructure/v2"
"github.com/prometheus/client_golang/prometheus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
)

func NewScheduler(cloudProvider cloudprovider.CloudProvider, kubeClient client.Client) *Scheduler {
return &Scheduler{
KubeClient: kubeClient,
Topology: &Topology{
cloudProvider: cloudProvider,
kubeClient: kubeClient,
},
}
type Scheduler interface {
Solve(context.Context, *v1alpha3.Provisioner, []*v1.Pod) ([]*Schedule, error)
}

type Scheduler struct {
KubeClient client.Client
Topology *Topology
type scheduler struct {
kubeClient client.Client
topology *Topology
}

type Schedule struct {
Expand All @@ -51,12 +50,22 @@ type Schedule struct {
Daemons []*v1.Pod
}

func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) {
func NewScheduler(cloudProvider cloudprovider.CloudProvider, kubeClient client.Client) Scheduler {
return &scheduler{
kubeClient: kubeClient,
topology: &Topology{
cloudProvider: cloudProvider,
kubeClient: kubeClient,
},
}
}

func (s *scheduler) Solve(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) {
// 1. Inject temporarily adds specific NodeSelectors to pods, which are then
// used by scheduling logic. This isn't strictly necessary, but is a useful
// trick to avoid passing topology decisions through the scheduling code. It
// lets us to treat TopologySpreadConstraints as just-in-time NodeSelectors.
if err := s.Topology.Inject(ctx, provisioner, pods); err != nil {
if err := s.topology.Inject(ctx, provisioner, pods); err != nil {
return nil, fmt.Errorf("injecting topology, %w", err)
}

Expand All @@ -79,7 +88,7 @@ func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha3.Provisioner
// getSchedules separates pods into a set of schedules. All pods in each group
// contain compatible scheduling constarints and can be deployed together on the
// same node, or multiple similar nodes if the pods exceed one node's capacity.
func (s *Scheduler) getSchedules(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) {
func (s *scheduler) getSchedules(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) {
// schedule uniqueness is tracked by hash(Constraints)
schedules := map[uint64]*Schedule{}
for _, pod := range pods {
Expand Down Expand Up @@ -116,10 +125,10 @@ func (s *Scheduler) getSchedules(ctx context.Context, provisioner *v1alpha3.Prov
return result, nil
}

func (s *Scheduler) getDaemons(ctx context.Context, node *v1.Node) ([]*v1.Pod, error) {
func (s *scheduler) getDaemons(ctx context.Context, node *v1.Node) ([]*v1.Pod, error) {
// 1. Get DaemonSets
daemonSetList := &appsv1.DaemonSetList{}
if err := s.KubeClient.List(ctx, daemonSetList); err != nil {
if err := s.kubeClient.List(ctx, daemonSetList); err != nil {
return nil, fmt.Errorf("listing daemonsets, %w", err)
}

Expand Down Expand Up @@ -147,3 +156,56 @@ func IsSchedulable(pod *v1.Pod, node *v1.Node) bool {
// TODO, support node affinity
return true
}

type schedulerMetricsDecorator struct {
scheduler Scheduler
scheduleTimeHistogramVec *prometheus.HistogramVec
}

func DecorateSchedulerMetrics(scheduler Scheduler) Scheduler {
scheduleTimeHistogramVec := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metrics.KarpenterNamespace,
Subsystem: "allocation_controller",
Name: "scheduling_duration_seconds",
Help: "Duration of scheduling process in seconds. Broken down by provisioner and result.",
Buckets: metrics.DurationBuckets(),
},
[]string{metrics.ProvisionerLabel, metrics.ResultLabel},
)
crmetrics.Registry.MustRegister(scheduleTimeHistogramVec)

return &schedulerMetricsDecorator{scheduler: scheduler, scheduleTimeHistogramVec: scheduleTimeHistogramVec}
}

func (s *schedulerMetricsDecorator) Solve(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) {
startTime := time.Now()
schedules, scheduleErr := s.scheduler.Solve(ctx, provisioner, pods)
durationSeconds := time.Since(startTime).Seconds()

result := "success"
if scheduleErr != nil {
result = "error"
}

provisionerName := provisioner.ObjectMeta.Name
observer, promErr := s.scheduleTimeHistogramVec.GetMetricWith(prometheus.Labels{
metrics.ProvisionerLabel: provisionerName,
metrics.ResultLabel: result,
})
if promErr != nil {
logging.FromContext(ctx).Warnf(
"Failed to record scheduling duration metric [%s=%s, %s=%s, duration=%f]: error=%w",
metrics.ProvisionerLabel,
provisionerName,
metrics.ResultLabel,
result,
durationSeconds,
promErr,
)
} else {
observer.Observe(durationSeconds)
}

return schedules, scheduleErr
}
33 changes: 33 additions & 0 deletions pkg/metrics/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

const (
// Common namespace for application metrics.
KarpenterNamespace = "karpenter"

// Common set of metric label names.
ResultLabel = "result"
ProvisionerLabel = "provisioner"
)

// DurationBuckets returns a []float64 of default threshold values for duration histograms.
// Each returned slice is new and may be modified without impacting other bucket definitions.
func DurationBuckets() []float64 {
// Use same bucket thresholds as controller-runtime.
// https://github.com/kubernetes-sigs/controller-runtime/blob/v0.10.0/pkg/internal/controller/metrics/metrics.go#L47-L48
return []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0,
1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 40, 50, 60}
}

0 comments on commit 08a42bd

Please sign in to comment.