diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 3b6551e560e1..d461fe108770 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -26,8 +26,8 @@ import ( "github.com/awslabs/karpenter/pkg/controllers/node" "github.com/awslabs/karpenter/pkg/controllers/provisioning" "github.com/awslabs/karpenter/pkg/controllers/termination" + "github.com/awslabs/karpenter/pkg/utils/injection" "github.com/awslabs/karpenter/pkg/utils/options" - "github.com/awslabs/karpenter/pkg/utils/restconfig" "github.com/go-logr/zapr" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -37,7 +37,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/util/flowcontrol" "knative.dev/pkg/configmap/informer" - "knative.dev/pkg/injection" + knativeinjection "knative.dev/pkg/injection" "knative.dev/pkg/injection/sharedmain" "knative.dev/pkg/logging" "knative.dev/pkg/signals" @@ -67,8 +67,8 @@ func main() { // Set up logger and watch for changes to log level ctx := LoggingContextOrDie(config, clientSet) - ctx = restconfig.Inject(ctx, config) - ctx = options.Inject(ctx, opts) + ctx = injection.WithConfig(ctx, config) + ctx = injection.WithOptions(ctx, opts) // Set up controller runtime controller cloudProvider := registry.NewCloudProvider(ctx, cloudprovider.Options{ClientSet: clientSet}) @@ -99,7 +99,7 @@ func main() { // LoggingContextOrDie injects a logger into the returned context. The logger is // configured by the ConfigMap `config-logging` and live updates the level. func LoggingContextOrDie(config *rest.Config, clientSet *kubernetes.Clientset) context.Context { - ctx, startinformers := injection.EnableInjectionOrDie(signals.NewContext(), config) + ctx, startinformers := knativeinjection.EnableInjectionOrDie(signals.NewContext(), config) logger, atomicLevel := sharedmain.SetupLoggerOrDie(ctx, component) ctx = logging.WithLogger(ctx, logger) rest.SetDefaultWarningHandler(&logging.WarningHandler{Logger: logger}) diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index 7f134db8abdb..8c187944a075 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -20,11 +20,12 @@ import ( "github.com/awslabs/karpenter/pkg/apis" "github.com/awslabs/karpenter/pkg/cloudprovider" "github.com/awslabs/karpenter/pkg/cloudprovider/registry" + "github.com/awslabs/karpenter/pkg/utils/injection" "github.com/awslabs/karpenter/pkg/utils/options" "k8s.io/client-go/kubernetes" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" - "knative.dev/pkg/injection" + knativeinjection "knative.dev/pkg/injection" "knative.dev/pkg/injection/sharedmain" "knative.dev/pkg/logging" "knative.dev/pkg/signals" @@ -41,8 +42,8 @@ var ( ) func main() { - config := injection.ParseAndGetRESTConfigOrDie() - ctx := webhook.WithOptions(injection.WithNamespaceScope(signals.NewContext(), system.Namespace()), webhook.Options{ + config := knativeinjection.ParseAndGetRESTConfigOrDie() + ctx := webhook.WithOptions(knativeinjection.WithNamespaceScope(signals.NewContext(), system.Namespace()), webhook.Options{ Port: opts.WebhookPort, ServiceName: "karpenter-webhook", SecretName: "karpenter-webhook-cert", @@ -91,5 +92,5 @@ func newConfigValidationController(ctx context.Context, cmw configmap.Watcher) * } func InjectContext(ctx context.Context) context.Context { - return options.Inject(ctx, opts) + return injection.WithOptions(ctx, opts) } diff --git a/pkg/cloudprovider/aws/apis/v1alpha1/provider_defaults.go b/pkg/cloudprovider/aws/apis/v1alpha1/provider_defaults.go index f3dd01a91f05..23ca1a675281 100644 --- a/pkg/cloudprovider/aws/apis/v1alpha1/provider_defaults.go +++ b/pkg/cloudprovider/aws/apis/v1alpha1/provider_defaults.go @@ -20,7 +20,7 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/awslabs/karpenter/pkg/utils/functional" - "github.com/awslabs/karpenter/pkg/utils/options" + "github.com/awslabs/karpenter/pkg/utils/injection" v1 "k8s.io/api/core/v1" ) @@ -30,8 +30,8 @@ var ClusterDiscoveryTagKeyFormat = "kubernetes.io/cluster/%s" func (c *Constraints) Default(ctx context.Context) { c.defaultArchitecture() c.defaultCapacityTypes() - c.defaultSubnets(options.Get(ctx).ClusterName) - c.defaultSecurityGroups(options.Get(ctx).ClusterName) + c.defaultSubnets(injection.GetOptions(ctx).ClusterName) + c.defaultSecurityGroups(injection.GetOptions(ctx).ClusterName) } func (c *Constraints) defaultCapacityTypes() { diff --git a/pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go b/pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go index 225f4c480220..4a58b76dce86 100644 --- a/pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go +++ b/pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go @@ -18,7 +18,7 @@ import ( "context" "fmt" - "github.com/awslabs/karpenter/pkg/utils/options" + "github.com/awslabs/karpenter/pkg/utils/injection" "knative.dev/pkg/apis" ) @@ -75,7 +75,7 @@ func (a *AWS) validateSecurityGroups() (errs *apis.FieldError) { func (a *AWS) validateTags(ctx context.Context) (errs *apis.FieldError) { // Avoiding a check on number of tags (hard limit of 50) since that limit is shared by user // defined and Karpenter tags, and the latter could change over time. - managedTags := ManagedTagsFor(options.Get(ctx).ClusterName) + managedTags := ManagedTagsFor(injection.GetOptions(ctx).ClusterName) for tagKey, tagValue := range a.Tags { if tagKey == "" { errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf( diff --git a/pkg/cloudprovider/aws/instance.go b/pkg/cloudprovider/aws/instance.go index 618942ded709..d3c6b7cef9ba 100644 --- a/pkg/cloudprovider/aws/instance.go +++ b/pkg/cloudprovider/aws/instance.go @@ -33,7 +33,7 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/awslabs/karpenter/pkg/cloudprovider" "github.com/awslabs/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" - "github.com/awslabs/karpenter/pkg/utils/options" + "github.com/awslabs/karpenter/pkg/utils/injection" ) type InstanceProvider struct { @@ -132,7 +132,7 @@ func (p *InstanceProvider) launchInstances(ctx context.Context, constraints *v1a TagSpecifications: []*ec2.TagSpecification{ { ResourceType: aws.String(ec2.ResourceTypeInstance), - Tags: v1alpha1.MergeTags(v1alpha1.ManagedTagsFor(options.Get(ctx).ClusterName), constraints.Tags), + Tags: v1alpha1.MergeTags(v1alpha1.ManagedTagsFor(injection.GetOptions(ctx).ClusterName), constraints.Tags), }, }, // OnDemandOptions are allowed to be specified even when requesting spot diff --git a/pkg/cloudprovider/aws/launchtemplate.go b/pkg/cloudprovider/aws/launchtemplate.go index c3be132daf6b..0849100cec30 100644 --- a/pkg/cloudprovider/aws/launchtemplate.go +++ b/pkg/cloudprovider/aws/launchtemplate.go @@ -29,8 +29,7 @@ import ( "github.com/awslabs/karpenter/pkg/cloudprovider" "github.com/awslabs/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" "github.com/awslabs/karpenter/pkg/utils/functional" - "github.com/awslabs/karpenter/pkg/utils/options" - "github.com/awslabs/karpenter/pkg/utils/restconfig" + "github.com/awslabs/karpenter/pkg/utils/injection" "github.com/mitchellh/hashstructure/v2" core "k8s.io/api/core/v1" "k8s.io/client-go/transport" @@ -109,7 +108,7 @@ func (p *LaunchTemplateProvider) Get(ctx context.Context, constraints *v1alpha1. // Ensure the launch template exists, or create it launchTemplate, err := p.ensureLaunchTemplate(ctx, &launchTemplateOptions{ UserData: userData, - ClusterName: options.Get(ctx).ClusterName, + ClusterName: injection.GetOptions(ctx).ClusterName, InstanceProfile: constraints.InstanceProfile, AMIID: amiID, SecurityGroupsIds: securityGroupsIds, @@ -234,9 +233,9 @@ func (p *LaunchTemplateProvider) getUserData(ctx context.Context, constraints *v exec > >(tee /var/log/user-data.log|logger -t user-data -s 2>/dev/console) 2>&1 /etc/eks/bootstrap.sh '%s' %s \ --apiserver-endpoint '%s'`, - options.Get(ctx).ClusterName, + injection.GetOptions(ctx).ClusterName, containerRuntimeArg, - options.Get(ctx).ClusterEndpoint)) + injection.GetOptions(ctx).ClusterEndpoint)) caBundle, err := p.GetCABundle(ctx) if err != nil { return "", fmt.Errorf("getting ca bundle for user data, %w", err) @@ -290,7 +289,7 @@ func (p *LaunchTemplateProvider) GetCABundle(ctx context.Context) (*string, erro // have used the simpler client-go InClusterConfig() method. // However, that only works when Karpenter is running as a Pod // within the same cluster it's managing. - restConfig := restconfig.Get(ctx) + restConfig := injection.GetConfig(ctx) if restConfig == nil { return nil, nil } diff --git a/pkg/cloudprovider/aws/securitygroups.go b/pkg/cloudprovider/aws/securitygroups.go index 35106e5440f5..ffb777bb1a4d 100644 --- a/pkg/cloudprovider/aws/securitygroups.go +++ b/pkg/cloudprovider/aws/securitygroups.go @@ -22,7 +22,7 @@ import ( "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/awslabs/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" - "github.com/awslabs/karpenter/pkg/utils/options" + "github.com/awslabs/karpenter/pkg/utils/injection" "github.com/mitchellh/hashstructure/v2" "github.com/patrickmn/go-cache" "knative.dev/pkg/logging" @@ -105,7 +105,7 @@ func (s *SecurityGroupProvider) filterClusterTaggedGroups(ctx context.Context, s if s.hasClusterTag(ctx, securityGroup) { if foundClusterTag { logging.FromContext(ctx).Debugf("Ignoring security group %s, only one group with tag %s is allowed", aws.StringValue(securityGroup.GroupId), - fmt.Sprint(v1alpha1.ClusterDiscoveryTagKeyFormat, options.Get(ctx).ClusterName)) + fmt.Sprint(v1alpha1.ClusterDiscoveryTagKeyFormat, injection.GetOptions(ctx).ClusterName)) continue } foundClusterTag = true @@ -117,7 +117,7 @@ func (s *SecurityGroupProvider) filterClusterTaggedGroups(ctx context.Context, s func (s *SecurityGroupProvider) hasClusterTag(ctx context.Context, securityGroup *ec2.SecurityGroup) bool { for _, tag := range securityGroup.Tags { - if aws.StringValue(tag.Key) == fmt.Sprintf(v1alpha1.ClusterDiscoveryTagKeyFormat, options.Get(ctx).ClusterName) { + if aws.StringValue(tag.Key) == fmt.Sprintf(v1alpha1.ClusterDiscoveryTagKeyFormat, injection.GetOptions(ctx).ClusterName) { return true } } diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index 6137789a36f6..8b91e9832a98 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -27,6 +27,7 @@ import ( "github.com/awslabs/karpenter/pkg/controllers/provisioning" "github.com/awslabs/karpenter/pkg/test" . "github.com/awslabs/karpenter/pkg/test/expectations" + "github.com/awslabs/karpenter/pkg/utils/injection" "github.com/awslabs/karpenter/pkg/utils/options" "github.com/awslabs/karpenter/pkg/utils/parallel" "github.com/awslabs/karpenter/pkg/utils/resources" @@ -59,7 +60,7 @@ func TestAPIs(t *testing.T) { var _ = BeforeSuite(func() { env = test.NewEnvironment(ctx, func(e *test.Environment) { - ctx = options.Inject(ctx, options.Options{ClusterName: "test-cluster", ClusterEndpoint: "https://test-cluster"}) + ctx = injection.WithOptions(ctx, options.Options{ClusterName: "test-cluster", ClusterEndpoint: "https://test-cluster"}) launchTemplateCache = cache.New(CacheTTL, CacheCleanupInterval) fakeEC2API = &fake.EC2API{} subnetProvider := NewSubnetProvider(fakeEC2API) diff --git a/pkg/controllers/metrics/common.go b/pkg/controllers/metrics/common.go index c35266e7ffb9..345dee1336db 100644 --- a/pkg/controllers/metrics/common.go +++ b/pkg/controllers/metrics/common.go @@ -22,7 +22,7 @@ import ( ) const ( - controllerName = "Metrics" + controllerName = "metrics" metricSubsystemCapacity = "capacity" metricSubsystemPods = "pods" diff --git a/pkg/controllers/metrics/controller.go b/pkg/controllers/metrics/controller.go index 01c0c6357ddf..83b8e8238766 100644 --- a/pkg/controllers/metrics/controller.go +++ b/pkg/controllers/metrics/controller.go @@ -17,7 +17,6 @@ package metrics import ( "context" "fmt" - "strings" "time" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" @@ -48,7 +47,7 @@ func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudPr } func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - loggerName := fmt.Sprintf("%s.provisioner/%s", strings.ToLower(controllerName), req.Name) + loggerName := fmt.Sprintf("%s.provisioner/%s", controllerName, req.Name) ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named(loggerName)) // Does the provisioner exist? diff --git a/pkg/controllers/node/controller.go b/pkg/controllers/node/controller.go index 797652a5810c..b8c9885e3e67 100644 --- a/pkg/controllers/node/controller.go +++ b/pkg/controllers/node/controller.go @@ -59,7 +59,7 @@ type Controller struct { // Reconcile executes a reallocation control loop for the resource func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("Node")) + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("node").With("node", req.String())) // 1. Retrieve Node, ignore if not provisioned or terminating stored := &v1.Node{} if err := c.kubeClient.Get(ctx, req.NamespacedName, stored); err != nil { @@ -102,7 +102,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco // 4. Patch any changes, regardless of errors if !equality.Semantic.DeepEqual(node, stored) { if err := c.kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil { - return reconcile.Result{}, fmt.Errorf("patching node %s, %w", node.Name, err) + return reconcile.Result{}, fmt.Errorf("patching node, %w", err) } } // 5. Requeue if error or if retryAfter is set @@ -115,7 +115,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco func (c *Controller) Register(ctx context.Context, m manager.Manager) error { return controllerruntime. NewControllerManagedBy(m). - Named("Node"). + Named("node"). For(&v1.Node{}). Watches( // Reconcile all nodes related to a provisioner when it changes. diff --git a/pkg/controllers/node/emptiness.go b/pkg/controllers/node/emptiness.go index 14cc531e6344..deb9cc54490c 100644 --- a/pkg/controllers/node/emptiness.go +++ b/pkg/controllers/node/emptiness.go @@ -55,7 +55,7 @@ func (r *Emptiness) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisi if !empty { if hasEmptinessTimestamp { delete(n.Annotations, v1alpha5.EmptinessTimestampAnnotationKey) - logging.FromContext(ctx).Infof("Removed emptiness TTL from node %s", n.Name) + logging.FromContext(ctx).Infof("Removed emptiness TTL from node") } return reconcile.Result{}, nil } @@ -64,7 +64,7 @@ func (r *Emptiness) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisi ttl := time.Duration(ptr.Int64Value(provisioner.Spec.TTLSecondsAfterEmpty)) * time.Second if !hasEmptinessTimestamp { n.Annotations[v1alpha5.EmptinessTimestampAnnotationKey] = injectabletime.Now().Format(time.RFC3339) - logging.FromContext(ctx).Infof("Added TTL to empty node %s", n.Name) + logging.FromContext(ctx).Infof("Added TTL to empty node") return reconcile.Result{RequeueAfter: ttl}, nil } // 4. Delete node if beyond TTL @@ -73,9 +73,9 @@ func (r *Emptiness) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisi return reconcile.Result{}, fmt.Errorf("parsing emptiness timestamp, %s", emptinessTimestamp) } if injectabletime.Now().After(emptinessTime.Add(ttl)) { - logging.FromContext(ctx).Infof("Triggering termination after %s for empty node %s", ttl, n.Name) + logging.FromContext(ctx).Infof("Triggering termination after %s for empty node", ttl) if err := r.kubeClient.Delete(ctx, n); err != nil { - return reconcile.Result{}, fmt.Errorf("deleting node %s, %w", n.Name, err) + return reconcile.Result{}, fmt.Errorf("deleting node, %w", err) } } return reconcile.Result{}, nil @@ -84,7 +84,7 @@ func (r *Emptiness) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisi func (r *Emptiness) isEmpty(ctx context.Context, n *v1.Node) (bool, error) { pods := &v1.PodList{} if err := r.kubeClient.List(ctx, pods, client.MatchingFields{"spec.nodeName": n.Name}); err != nil { - return false, fmt.Errorf("listing pods for node %s, %w", n.Name, err) + return false, fmt.Errorf("listing pods for node, %w", err) } for i := range pods.Items { p := pods.Items[i] diff --git a/pkg/controllers/node/expiration.go b/pkg/controllers/node/expiration.go index 9729aedd6fb0..9f5d15c67055 100644 --- a/pkg/controllers/node/expiration.go +++ b/pkg/controllers/node/expiration.go @@ -43,7 +43,7 @@ func (r *Expiration) Reconcile(ctx context.Context, provisioner *v1alpha5.Provis expirationTTL := time.Duration(ptr.Int64Value(provisioner.Spec.TTLSecondsUntilExpired)) * time.Second expirationTime := node.CreationTimestamp.Add(expirationTTL) if injectabletime.Now().After(expirationTime) { - logging.FromContext(ctx).Infof("Triggering termination for expired node %s after %s (+%s)", node.Name, expirationTTL, time.Since(expirationTime)) + logging.FromContext(ctx).Infof("Triggering termination for expired node after %s (+%s)", expirationTTL, time.Since(expirationTime)) if err := r.kubeClient.Delete(ctx, node); err != nil { return reconcile.Result{}, fmt.Errorf("deleting node, %w", err) } diff --git a/pkg/controllers/node/liveness.go b/pkg/controllers/node/liveness.go index 128dd83efba1..477e71f2dc8b 100644 --- a/pkg/controllers/node/liveness.go +++ b/pkg/controllers/node/liveness.go @@ -49,9 +49,9 @@ func (r *Liveness) Reconcile(ctx context.Context, _ *v1alpha5.Provisioner, n *v1 if condition.Reason != "" && condition.Reason != "NodeStatusNeverUpdated" { return reconcile.Result{}, nil } - logging.FromContext(ctx).Infof("Triggering termination for node that failed to join %s", n.Name) + logging.FromContext(ctx).Infof("Triggering termination for node that failed to join") if err := r.kubeClient.Delete(ctx, n); err != nil { - return reconcile.Result{}, fmt.Errorf("deleting node %s, %w", n.Name, err) + return reconcile.Result{}, fmt.Errorf("deleting node, %w", err) } return reconcile.Result{}, nil } diff --git a/pkg/controllers/provisioning/binpacking/packer.go b/pkg/controllers/provisioning/binpacking/packer.go index 43ea230bee97..ed4d8df38915 100644 --- a/pkg/controllers/provisioning/binpacking/packer.go +++ b/pkg/controllers/provisioning/binpacking/packer.go @@ -24,6 +24,7 @@ import ( "github.com/awslabs/karpenter/pkg/controllers/provisioning/scheduling" "github.com/awslabs/karpenter/pkg/metrics" "github.com/awslabs/karpenter/pkg/utils/apiobject" + "github.com/awslabs/karpenter/pkg/utils/injection" "github.com/awslabs/karpenter/pkg/utils/resources" "github.com/mitchellh/hashstructure/v2" "github.com/prometheus/client_golang/prometheus" @@ -38,7 +39,7 @@ var ( // MaxInstanceTypes defines the number of instance type options to return to the cloud provider MaxInstanceTypes = 20 - packDuration = prometheus.NewHistogram( + packDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: metrics.Namespace, Subsystem: "allocation_controller", @@ -46,6 +47,7 @@ var ( Help: "Duration of binpacking process in seconds.", Buckets: metrics.DurationBuckets(), }, + []string{metrics.ProvisionerLabel}, ) ) @@ -74,7 +76,7 @@ type Packing struct { // It follows the First Fit Decreasing bin packing technique, reference- // https://en.wikipedia.org/wiki/Bin_packing_problem#First_Fit_Decreasing_(FFD) func (p *Packer) Pack(ctx context.Context, schedule *scheduling.Schedule, instances []cloudprovider.InstanceType) []*Packing { - defer metrics.Measure(packDuration)() + defer metrics.Measure(packDuration.WithLabelValues(injection.GetNamespacedName(ctx).Name))() // Sort pods in decreasing order by the amount of CPU requested, if // CPU requested is equal compare memory requested. diff --git a/pkg/controllers/provisioning/controller.go b/pkg/controllers/provisioning/controller.go index 4d78e751061f..38f947dc9dfd 100644 --- a/pkg/controllers/provisioning/controller.go +++ b/pkg/controllers/provisioning/controller.go @@ -33,6 +33,7 @@ import ( "github.com/awslabs/karpenter/pkg/controllers/provisioning/binpacking" "github.com/awslabs/karpenter/pkg/controllers/provisioning/scheduling" "github.com/awslabs/karpenter/pkg/utils/functional" + "github.com/awslabs/karpenter/pkg/utils/injection" ) // Controller for the resource @@ -60,6 +61,8 @@ func NewController(ctx context.Context, kubeClient client.Client, coreV1Client c // Reconcile a control loop for the resource func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("provisioning").With("provisioner", req.Name)) + ctx = injection.WithNamespacedName(ctx, req.NamespacedName) + provisioner := &v1alpha5.Provisioner{} if err := c.kubeClient.Get(ctx, req.NamespacedName, provisioner); err != nil { if errors.IsNotFound(err) { diff --git a/pkg/controllers/provisioning/launcher.go b/pkg/controllers/provisioning/launcher.go index d8900d7a7fed..a22c80ad8f75 100644 --- a/pkg/controllers/provisioning/launcher.go +++ b/pkg/controllers/provisioning/launcher.go @@ -25,6 +25,7 @@ import ( "github.com/awslabs/karpenter/pkg/controllers/provisioning/scheduling" "github.com/awslabs/karpenter/pkg/metrics" "github.com/awslabs/karpenter/pkg/utils/functional" + "github.com/awslabs/karpenter/pkg/utils/injection" "github.com/prometheus/client_golang/prometheus" "go.uber.org/multierr" v1 "k8s.io/api/core/v1" @@ -68,7 +69,7 @@ func (l *Launcher) Launch(ctx context.Context, schedules []*scheduling.Schedule, } func (l *Launcher) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err error) { - defer metrics.Measure(bindTimeHistogram)() + defer metrics.Measure(bindTimeHistogram.WithLabelValues(injection.GetNamespacedName(ctx).Name))() // Add the Karpenter finalizer to the node to enable the termination workflow node.Finalizers = append(node.Finalizers, v1alpha5.TerminationFinalizer) @@ -110,7 +111,7 @@ func (l *Launcher) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err return nil } -var bindTimeHistogram = prometheus.NewHistogram( +var bindTimeHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: metrics.Namespace, Subsystem: "allocation_controller", @@ -118,6 +119,7 @@ var bindTimeHistogram = prometheus.NewHistogram( Help: "Duration of bind process in seconds. Broken down by result.", Buckets: metrics.DurationBuckets(), }, + []string{metrics.ProvisionerLabel}, ) func init() { diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index 6c241fc44363..da842bc8eba3 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -21,6 +21,7 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/awslabs/karpenter/pkg/cloudprovider" "github.com/awslabs/karpenter/pkg/metrics" + "github.com/awslabs/karpenter/pkg/utils/injection" "github.com/mitchellh/hashstructure/v2" "github.com/prometheus/client_golang/prometheus" appsv1 "k8s.io/api/apps/v1" @@ -69,7 +70,8 @@ func NewScheduler(kubeClient client.Client, cloudProvider cloudprovider.CloudPro } func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha5.Provisioner, pods []*v1.Pod) (schedules []*Schedule, err error) { - defer metrics.Measure(schedulingDuration.WithLabelValues(provisioner.Name))() + + defer metrics.Measure(schedulingDuration.WithLabelValues(injection.GetNamespacedName(ctx).Name))() // Inject temporarily adds specific NodeSelectors to pods, which are then // used by scheduling logic. This isn't strictly necessary, but is a useful // trick to avoid passing topology decisions through the scheduling code. It diff --git a/pkg/controllers/termination/controller.go b/pkg/controllers/termination/controller.go index b4d2ff68cd67..8679aa2df167 100644 --- a/pkg/controllers/termination/controller.go +++ b/pkg/controllers/termination/controller.go @@ -58,7 +58,7 @@ func NewController(ctx context.Context, kubeClient client.Client, coreV1Client c // Reconcile executes a termination control loop for the resource func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("Termination")) + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("termination").With("node", req.String())) // 1. Retrieve node from reconcile request node := &v1.Node{} @@ -95,7 +95,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco func (c *Controller) Register(_ context.Context, m manager.Manager) error { return controllerruntime. NewControllerManagedBy(m). - Named("Termination"). + Named("termination"). For(&v1.Node{}). WithOptions( controller.Options{ diff --git a/pkg/controllers/termination/terminate.go b/pkg/controllers/termination/terminate.go index 8a1912f1a40c..7ba8a10d11c0 100644 --- a/pkg/controllers/termination/terminate.go +++ b/pkg/controllers/termination/terminate.go @@ -50,7 +50,7 @@ func (t *Terminator) cordon(ctx context.Context, node *v1.Node) error { if err := t.KubeClient.Patch(ctx, node, client.MergeFrom(persisted)); err != nil { return fmt.Errorf("patching node %s, %w", node.Name, err) } - logging.FromContext(ctx).Infof("Cordoned node %s", node.Name) + logging.FromContext(ctx).Infof("Cordoned node") return nil } @@ -59,14 +59,14 @@ func (t *Terminator) drain(ctx context.Context, node *v1.Node) (bool, error) { // 1. Get pods on node pods, err := t.getPods(ctx, node) if err != nil { - return false, fmt.Errorf("listing pods for node %s, %w", node.Name, err) + return false, fmt.Errorf("listing pods for node, %w", err) } // 2. Separate pods as non-critical and critical // https://kubernetes.io/docs/concepts/architecture/nodes/#graceful-node-shutdown for _, pod := range pods { if val := pod.Annotations[v1alpha5.DoNotEvictPodAnnotationKey]; val == "true" { - logging.FromContext(ctx).Debugf("Unable to drain node %s, pod %s has do-not-evict annotation", node.Name, pod.Name) + logging.FromContext(ctx).Debugf("Unable to drain node, pod %s has do-not-evict annotation", pod.Name) return false, nil } } @@ -93,9 +93,9 @@ func (t *Terminator) terminate(ctx context.Context, node *v1.Node) error { if errors.IsNotFound(err) { return nil } - return fmt.Errorf("removing finalizer from node %s, %w", node.Name, err) + return fmt.Errorf("removing finalizer from node, %w", err) } - logging.FromContext(ctx).Infof("Deleted node %s", node.Name) + logging.FromContext(ctx).Infof("Deleted node") return nil } @@ -103,7 +103,7 @@ func (t *Terminator) terminate(ctx context.Context, node *v1.Node) error { func (t *Terminator) getPods(ctx context.Context, node *v1.Node) ([]*v1.Pod, error) { pods := &v1.PodList{} if err := t.KubeClient.List(ctx, pods, client.MatchingFields{"spec.nodeName": node.Name}); err != nil { - return nil, fmt.Errorf("listing pods on node %s, %w", node.Name, err) + return nil, fmt.Errorf("listing pods on node, %w", err) } return ptr.PodListToSlice(pods), nil } diff --git a/pkg/utils/injection/injection.go b/pkg/utils/injection/injection.go new file mode 100644 index 000000000000..95f81814bc46 --- /dev/null +++ b/pkg/utils/injection/injection.go @@ -0,0 +1,65 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package injection + +import ( + "context" + + "github.com/awslabs/karpenter/pkg/utils/options" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" +) + +type resourceKey struct{} + +func WithNamespacedName(ctx context.Context, namespacedname types.NamespacedName) context.Context { + return context.WithValue(ctx, resourceKey{}, namespacedname) +} + +func GetNamespacedName(ctx context.Context) types.NamespacedName { + retval := ctx.Value(resourceKey{}) + if retval == nil { + return types.NamespacedName{} + } + return retval.(types.NamespacedName) +} + +type optionsKey struct{} + +func WithOptions(ctx context.Context, opts options.Options) context.Context { + return context.WithValue(ctx, optionsKey{}, opts) +} + +func GetOptions(ctx context.Context) options.Options { + retval := ctx.Value(optionsKey{}) + if retval == nil { + return options.Options{} + } + return retval.(options.Options) +} + +type configKey struct{} + +func WithConfig(ctx context.Context, config *rest.Config) context.Context { + return context.WithValue(ctx, configKey{}, config) +} + +func GetConfig(ctx context.Context) *rest.Config { + retval := ctx.Value(configKey{}) + if retval == nil { + return nil + } + return retval.(*rest.Config) +} diff --git a/pkg/utils/options/options.go b/pkg/utils/options/options.go index d929c1d3524b..6509878b719c 100644 --- a/pkg/utils/options/options.go +++ b/pkg/utils/options/options.go @@ -15,7 +15,6 @@ limitations under the License. package options import ( - "context" "flag" "fmt" "net/url" @@ -51,16 +50,6 @@ type Options struct { KubeClientBurst int } -type optionsKey struct{} - -func Get(ctx context.Context) Options { - return ctx.Value(optionsKey{}).(Options) -} - -func Inject(ctx context.Context, opts Options) context.Context { - return context.WithValue(ctx, optionsKey{}, opts) -} - func (o Options) Validate() (err error) { err = multierr.Append(err, o.validateEndpoint()) if o.ClusterName == "" { diff --git a/pkg/utils/restconfig/restconfig.go b/pkg/utils/restconfig/restconfig.go deleted file mode 100644 index ef0280ce02a8..000000000000 --- a/pkg/utils/restconfig/restconfig.go +++ /dev/null @@ -1,35 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package restconfig - -import ( - "context" - - "k8s.io/client-go/rest" -) - -type contextKey struct{} - -func Inject(ctx context.Context, config *rest.Config) context.Context { - return context.WithValue(ctx, contextKey{}, config) -} - -func Get(ctx context.Context) *rest.Config { - retval := ctx.Value(contextKey{}) - if retval == nil { - return nil - } - return retval.(*rest.Config) -}