Skip to content

Commit

Permalink
add allocation bind duration metric
Browse files Browse the repository at this point in the history
  • Loading branch information
cjerad committed Sep 14, 2021
1 parent e1b9a1d commit 6743667
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 272 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.13.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/prometheus/client_golang v1.11.0
go.uber.org/multierr v1.7.0
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6
k8s.io/api v0.20.7
Expand Down
260 changes: 0 additions & 260 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var _ = BeforeSuite(func() {
registry.RegisterOrDie(cloudProvider)
controller = &allocation.Controller{
Filter: &allocation.Filter{KubeClient: e.Client},
Binder: &allocation.Binder{KubeClient: e.Client, CoreV1Client: clientSet.CoreV1()},
Binder: allocation.NewBinder(e.Client, clientSet.CoreV1()),
Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond),
Scheduler: scheduling.NewScheduler(cloudProvider, e.Client),
Packer: binpacking.NewPacker(),
Expand Down
77 changes: 70 additions & 7 deletions pkg/controllers/allocation/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package allocation
import (
"context"
"fmt"
"time"

"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha3"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -27,14 +29,23 @@ import (
"k8s.io/client-go/util/workqueue"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

type Binder struct {
KubeClient client.Client
CoreV1Client corev1.CoreV1Interface
type Binder interface {
Bind(context.Context, *v1.Node, []*v1.Pod) error
}

func (b *Binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error {
type binder struct {
kubeClient client.Client
coreV1Client corev1.CoreV1Interface
}

func NewBinder(kubeClient client.Client, coreV1Client corev1.CoreV1Interface) Binder {
return &binder{kubeClient: kubeClient, coreV1Client: coreV1Client}
}

func (b *binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error {
// 1. Add the Karpenter finalizer to the node to enable the termination workflow
node.Finalizers = append(node.Finalizers, v1alpha3.TerminationFinalizer)
// 2. Taint karpenter.sh/not-ready=NoSchedule to prevent the kube scheduler
Expand All @@ -55,7 +66,7 @@ func (b *Binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error
// with the API server. In the common case, we create the node object
// ourselves to enforce the binding decision and enable images to be pulled
// before the node is fully Ready.
if _, err := b.CoreV1Client.Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
if _, err := b.coreV1Client.Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
if !errors.IsAlreadyExists(err) {
return fmt.Errorf("creating node %s, %w", node.Name, err)
}
Expand All @@ -71,9 +82,9 @@ func (b *Binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error
return err
}

func (b *Binder) bind(ctx context.Context, node *v1.Node, pod *v1.Pod) error {
func (b *binder) bind(ctx context.Context, node *v1.Node, pod *v1.Pod) error {
// TODO, Stop using deprecated v1.Binding
if err := b.CoreV1Client.Pods(pod.Namespace).Bind(ctx, &v1.Binding{
if err := b.coreV1Client.Pods(pod.Namespace).Bind(ctx, &v1.Binding{
TypeMeta: pod.TypeMeta,
ObjectMeta: pod.ObjectMeta,
Target: v1.ObjectReference{Name: node.Name},
Expand All @@ -82,3 +93,55 @@ func (b *Binder) bind(ctx context.Context, node *v1.Node, pod *v1.Pod) error {
}
return nil
}

type binderMetricsDecorator struct {
binder Binder
bindTimeHistogramVec *prometheus.HistogramVec
}

const metricLabelResult = "result"

func DecorateBinderMetrics(binder Binder) Binder {
bindTimeHistogramVec := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "karpenter",
Subsystem: "allocation_controller",
Name: "bind_duration_seconds",
Help: "Duration of bind process in seconds. Broken down by result.",
// Use same bucket thresholds as controller-runtime.
// https://github.com/kubernetes-sigs/controller-runtime/blob/v0.10.0/pkg/internal/controller/metrics/metrics.go#L47-L48
Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0,
1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 40, 50, 60},
},
[]string{metricLabelResult},
)
metrics.Registry.MustRegister(bindTimeHistogramVec)

return &binderMetricsDecorator{binder: binder, bindTimeHistogramVec: bindTimeHistogramVec}
}

func (b *binderMetricsDecorator) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error {
startTime := time.Now()
bindErr := b.binder.Bind(ctx, node, pods)
durationSeconds := time.Since(startTime).Seconds()

result := "success"
if bindErr != nil {
result = "error"
}

observer, promErr := b.bindTimeHistogramVec.GetMetricWith(prometheus.Labels{metricLabelResult: result})
if promErr != nil {
logging.FromContext(ctx).Warnf(
"Failed to record bind duration metric [%s=%s, duration=%f]: error=%w",
metricLabelResult,
result,
durationSeconds,
promErr,
)
} else {
observer.Observe(durationSeconds)
}

return bindErr
}
4 changes: 2 additions & 2 deletions pkg/controllers/allocation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const (
type Controller struct {
Batcher *Batcher
Filter *Filter
Binder *Binder
Binder Binder
Scheduler *scheduling.Scheduler
Packer binpacking.Packer
CloudProvider cloudprovider.CloudProvider
Expand All @@ -66,7 +66,7 @@ type Controller struct {
func NewController(kubeClient client.Client, coreV1Client corev1.CoreV1Interface, cloudProvider cloudprovider.CloudProvider) *Controller {
return &Controller{
Filter: &Filter{KubeClient: kubeClient},
Binder: &Binder{KubeClient: kubeClient, CoreV1Client: coreV1Client},
Binder: DecorateBinderMetrics(NewBinder(kubeClient, coreV1Client)),
Batcher: NewBatcher(maxBatchWindow, batchIdleTimeout),
Scheduler: scheduling.NewScheduler(cloudProvider, kubeClient),
Packer: binpacking.NewPacker(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/allocation/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var _ = BeforeSuite(func() {
registry.RegisterOrDie(cloudProvider)
controller = &allocation.Controller{
Filter: &allocation.Filter{KubeClient: e.Client},
Binder: &allocation.Binder{KubeClient: e.Client, CoreV1Client: corev1.NewForConfigOrDie(e.Config)},
Binder: allocation.NewBinder(e.Client, corev1.NewForConfigOrDie(e.Config)),
Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond),
Scheduler: scheduling.NewScheduler(cloudProvider, e.Client),
Packer: binpacking.NewPacker(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/allocation/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var _ = BeforeSuite(func() {
registry.RegisterOrDie(cloudProvider)
controller = &allocation.Controller{
Filter: &allocation.Filter{KubeClient: e.Client},
Binder: &allocation.Binder{KubeClient: e.Client, CoreV1Client: corev1.NewForConfigOrDie(e.Config)},
Binder: allocation.NewBinder(e.Client, corev1.NewForConfigOrDie(e.Config)),
Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond),
Scheduler: scheduling.NewScheduler(cloudProvider, e.Client),
Packer: binpacking.NewPacker(),
Expand Down

0 comments on commit 6743667

Please sign in to comment.