Skip to content

Commit

Permalink
remove decorator pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
cjerad committed Sep 16, 2021
1 parent 08a42bd commit 5123d91
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 69 deletions.
4 changes: 2 additions & 2 deletions pkg/controllers/allocation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Controller struct {
Batcher *Batcher
Filter *Filter
Binder Binder
Scheduler scheduling.Scheduler
Scheduler *scheduling.Scheduler
Packer binpacking.Packer
CloudProvider cloudprovider.CloudProvider
KubeClient client.Client
Expand All @@ -68,7 +68,7 @@ func NewController(kubeClient client.Client, coreV1Client corev1.CoreV1Interface
Filter: &Filter{KubeClient: kubeClient},
Binder: DecorateBinderMetrics(NewBinder(kubeClient, coreV1Client)),
Batcher: NewBatcher(maxBatchWindow, batchIdleTimeout),
Scheduler: scheduling.DecorateSchedulerMetrics(scheduling.NewScheduler(cloudProvider, kubeClient)),
Scheduler: scheduling.NewScheduler(cloudProvider, kubeClient),
Packer: binpacking.NewPacker(),
CloudProvider: cloudProvider,
KubeClient: kubeClient,
Expand Down
124 changes: 57 additions & 67 deletions pkg/controllers/allocation/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,24 @@ import (
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
)

type Scheduler interface {
Solve(context.Context, *v1alpha3.Provisioner, []*v1.Pod) ([]*Schedule, error)
var scheduleTimeHistogramVec = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metrics.KarpenterNamespace,
Subsystem: "allocation_controller",
Name: "scheduling_duration_seconds",
Help: "Duration of scheduling process in seconds. Broken down by provisioner and result.",
Buckets: metrics.DurationBuckets(),
},
[]string{metrics.ProvisionerLabel, metrics.ResultLabel},
)

func init() {
crmetrics.Registry.MustRegister(scheduleTimeHistogramVec)
}

type scheduler struct {
kubeClient client.Client
topology *Topology
type Scheduler struct {
KubeClient client.Client
Topology *Topology
}

type Schedule struct {
Expand All @@ -50,22 +61,54 @@ type Schedule struct {
Daemons []*v1.Pod
}

func NewScheduler(cloudProvider cloudprovider.CloudProvider, kubeClient client.Client) Scheduler {
return &scheduler{
kubeClient: kubeClient,
topology: &Topology{
func NewScheduler(cloudProvider cloudprovider.CloudProvider, kubeClient client.Client) *Scheduler {
return &Scheduler{
KubeClient: kubeClient,
Topology: &Topology{
cloudProvider: cloudProvider,
kubeClient: kubeClient,
},
}
}

func (s *scheduler) Solve(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) {
func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) {
startTime := time.Now()
schedules, scheduleErr := s.solve(ctx, provisioner, pods)
durationSeconds := time.Since(startTime).Seconds()

result := "success"
if scheduleErr != nil {
result = "error"
}

provisionerName := provisioner.ObjectMeta.Name
observer, promErr := scheduleTimeHistogramVec.GetMetricWith(prometheus.Labels{
metrics.ProvisionerLabel: provisionerName,
metrics.ResultLabel: result,
})
if promErr != nil {
logging.FromContext(ctx).Warnf(
"Failed to record scheduling duration metric [%s=%s, %s=%s, duration=%f]: error=%w",
metrics.ProvisionerLabel,
provisionerName,
metrics.ResultLabel,
result,
durationSeconds,
promErr,
)
} else {
observer.Observe(durationSeconds)
}

return schedules, scheduleErr
}

func (s *Scheduler) solve(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) {
// 1. Inject temporarily adds specific NodeSelectors to pods, which are then
// used by scheduling logic. This isn't strictly necessary, but is a useful
// trick to avoid passing topology decisions through the scheduling code. It
// lets us to treat TopologySpreadConstraints as just-in-time NodeSelectors.
if err := s.topology.Inject(ctx, provisioner, pods); err != nil {
if err := s.Topology.Inject(ctx, provisioner, pods); err != nil {
return nil, fmt.Errorf("injecting topology, %w", err)
}

Expand All @@ -88,7 +131,7 @@ func (s *scheduler) Solve(ctx context.Context, provisioner *v1alpha3.Provisioner
// getSchedules separates pods into a set of schedules. All pods in each group
// contain compatible scheduling constarints and can be deployed together on the
// same node, or multiple similar nodes if the pods exceed one node's capacity.
func (s *scheduler) getSchedules(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) {
func (s *Scheduler) getSchedules(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) {
// schedule uniqueness is tracked by hash(Constraints)
schedules := map[uint64]*Schedule{}
for _, pod := range pods {
Expand Down Expand Up @@ -125,10 +168,10 @@ func (s *scheduler) getSchedules(ctx context.Context, provisioner *v1alpha3.Prov
return result, nil
}

func (s *scheduler) getDaemons(ctx context.Context, node *v1.Node) ([]*v1.Pod, error) {
func (s *Scheduler) getDaemons(ctx context.Context, node *v1.Node) ([]*v1.Pod, error) {
// 1. Get DaemonSets
daemonSetList := &appsv1.DaemonSetList{}
if err := s.kubeClient.List(ctx, daemonSetList); err != nil {
if err := s.KubeClient.List(ctx, daemonSetList); err != nil {
return nil, fmt.Errorf("listing daemonsets, %w", err)
}

Expand Down Expand Up @@ -156,56 +199,3 @@ func IsSchedulable(pod *v1.Pod, node *v1.Node) bool {
// TODO, support node affinity
return true
}

type schedulerMetricsDecorator struct {
scheduler Scheduler
scheduleTimeHistogramVec *prometheus.HistogramVec
}

func DecorateSchedulerMetrics(scheduler Scheduler) Scheduler {
scheduleTimeHistogramVec := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metrics.KarpenterNamespace,
Subsystem: "allocation_controller",
Name: "scheduling_duration_seconds",
Help: "Duration of scheduling process in seconds. Broken down by provisioner and result.",
Buckets: metrics.DurationBuckets(),
},
[]string{metrics.ProvisionerLabel, metrics.ResultLabel},
)
crmetrics.Registry.MustRegister(scheduleTimeHistogramVec)

return &schedulerMetricsDecorator{scheduler: scheduler, scheduleTimeHistogramVec: scheduleTimeHistogramVec}
}

func (s *schedulerMetricsDecorator) Solve(ctx context.Context, provisioner *v1alpha3.Provisioner, pods []*v1.Pod) ([]*Schedule, error) {
startTime := time.Now()
schedules, scheduleErr := s.scheduler.Solve(ctx, provisioner, pods)
durationSeconds := time.Since(startTime).Seconds()

result := "success"
if scheduleErr != nil {
result = "error"
}

provisionerName := provisioner.ObjectMeta.Name
observer, promErr := s.scheduleTimeHistogramVec.GetMetricWith(prometheus.Labels{
metrics.ProvisionerLabel: provisionerName,
metrics.ResultLabel: result,
})
if promErr != nil {
logging.FromContext(ctx).Warnf(
"Failed to record scheduling duration metric [%s=%s, %s=%s, duration=%f]: error=%w",
metrics.ProvisionerLabel,
provisionerName,
metrics.ResultLabel,
result,
durationSeconds,
promErr,
)
} else {
observer.Observe(durationSeconds)
}

return schedules, scheduleErr
}

0 comments on commit 5123d91

Please sign in to comment.