From ac113612ec4dcc7da670c9114f76bc12baaacb47 Mon Sep 17 00:00:00 2001 From: jayson wang Date: Sun, 1 Dec 2024 15:21:44 +0800 Subject: [PATCH] allow scaling custom resource --- internal/dao/registry.go | 26 ++++++++--- internal/dao/scalable.go | 77 +++++++++++++++++++++++++++++++++ internal/dao/types.go | 6 +++ internal/view/command.go | 7 ++- internal/view/scale_extender.go | 61 +++++++++++++++++++++++--- internal/watch/factory.go | 14 ++---- 6 files changed, 168 insertions(+), 23 deletions(-) create mode 100644 internal/dao/scalable.go diff --git a/internal/dao/registry.go b/internal/dao/registry.go index f512bc60cd..e65858e65d 100644 --- a/internal/dao/registry.go +++ b/internal/dao/registry.go @@ -11,7 +11,6 @@ import ( "github.com/derailed/k9s/internal/client" "github.com/rs/zerolog/log" - apiext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" @@ -20,10 +19,11 @@ import ( ) const ( - crdCat = "crd" - k9sCat = "k9s" - helmCat = "helm" - crdGVR = "apiextensions.k8s.io/v1/customresourcedefinitions" + crdCat = "crd" + k9sCat = "k9s" + helmCat = "helm" + scaleCat = "scale" + crdGVR = "apiextensions.k8s.io/v1/customresourcedefinitions" ) // MetaAccess tracks resources metadata. @@ -96,6 +96,9 @@ func AccessorFor(f Factory, gvr client.GVR) (Accessor, error) { r, ok := m[gvr] if !ok { r = new(Generic) + if MetaAccess.IsScalable(gvr) { + r = new(Scaler) + } log.Debug().Msgf("No DAO registry entry for %q. Using generics!", gvr) } r.Init(f, gvr) @@ -198,6 +201,19 @@ func (m *Meta) LoadResources(f Factory) error { return nil } +// IsScalable check if the resource can be scaled +func (m *Meta) IsScalable(gvr client.GVR) bool { + if meta, ok := m.resMetas[gvr]; ok { + for _, c := range meta.Categories { + if c == scaleCat { + return true + } + } + } + + return false +} + // BOZO!! Need countermeasures for direct commands! func loadNonResource(m ResourceMetas) { loadK9s(m) diff --git a/internal/dao/scalable.go b/internal/dao/scalable.go new file mode 100644 index 0000000000..6c13f45d3c --- /dev/null +++ b/internal/dao/scalable.go @@ -0,0 +1,77 @@ +package dao + +import ( + "context" + + "github.com/derailed/k9s/internal/client" + "github.com/rs/zerolog/log" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/restmapper" + "k8s.io/client-go/scale" +) + +var _ Scalable = (*Scaler)(nil) +var _ ReplicasGetter = (*Scaler)(nil) + +// Scaler represents a generic resource with scaling. +type Scaler struct { + Generic +} + +// Replicas returns the number of replicas for the resource located at the given path. +func (s *Scaler) Replicas(ctx context.Context, path string) (int32, error) { + scaleClient, err := s.scaleClient() + if err != nil { + return 0, err + } + + ns, name := client.Namespaced(path) + currScale, err := scaleClient.Scales(ns).Get(ctx, *s.gvr.GR(), name, metav1.GetOptions{}) + if err != nil { + return 0, err + } + + return currScale.Spec.Replicas, nil +} + +// Scale modifies the number of replicas for a given resource specified by the path. +func (s *Scaler) Scale(ctx context.Context, path string, replicas int32) error { + ns, name := client.Namespaced(path) + + scaleClient, err := s.scaleClient() + if err != nil { + return err + } + + currentScale, err := scaleClient.Scales(ns).Get(ctx, *s.gvr.GR(), name, metav1.GetOptions{}) + if err != nil { + return err + } + + currentScale.Spec.Replicas = replicas + updatedScale, err := scaleClient.Scales(ns).Update(ctx, *s.gvr.GR(), currentScale, metav1.UpdateOptions{}) + if err != nil { + return err + } + + log.Debug().Msgf("%s scaled to %d", path, updatedScale.Spec.Replicas) + return nil +} + +func (s *Scaler) scaleClient() (scale.ScalesGetter, error) { + cfg, err := s.Client().RestConfig() + if err != nil { + return nil, err + } + + discoveryClient, err := s.Client().CachedDiscovery() + if err != nil { + return nil, err + } + + mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) + scaleKindResolver := scale.NewDiscoveryScaleKindResolver(discoveryClient) + + return scale.NewForConfig(cfg, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) +} diff --git a/internal/dao/types.go b/internal/dao/types.go index 8afa480fdc..014c668e4c 100644 --- a/internal/dao/types.go +++ b/internal/dao/types.go @@ -121,6 +121,12 @@ type Scalable interface { Scale(ctx context.Context, path string, replicas int32) error } +// ReplicasGetter represents a resource with replicas. +type ReplicasGetter interface { + // Replicas returns the number of replicas for the resource located at the given path. + Replicas(ctx context.Context, path string) (int32, error) +} + // Controller represents a pod controller. type Controller interface { // Pod returns a pod instance matching the selector. diff --git a/internal/view/command.go b/internal/view/command.go index bb81826c26..abdf09ab26 100644 --- a/internal/view/command.go +++ b/internal/view/command.go @@ -288,7 +288,12 @@ func (c *Command) viewMetaFor(p *cmd.Interpreter) (client.GVR, *MetaViewer, erro v := MetaViewer{ viewerFn: func(gvr client.GVR) ResourceViewer { - return NewOwnerExtender(NewBrowser(gvr)) + viewer := NewOwnerExtender(NewBrowser(gvr)) + if dao.MetaAccess.IsScalable(gvr) { + viewer = NewScaleExtender(viewer) + } + + return viewer }, } if mv, ok := customViewers[gvr]; ok { diff --git a/internal/view/scale_extender.go b/internal/view/scale_extender.go index e402bd5ae8..45fddaf9ba 100644 --- a/internal/view/scale_extender.go +++ b/internal/view/scale_extender.go @@ -83,22 +83,69 @@ func (s *ScaleExtender) valueOf(col string) (string, error) { return s.GetTable().GetSelectedCell(colIdx), nil } +func (s *ScaleExtender) replicasFromReady(_ string) (string, error) { + replicas, err := s.valueOf("READY") + if err != nil { + return "", err + } + + tokens := strings.Split(replicas, "/") + if len(tokens) < 2 { + return "", fmt.Errorf("unable to locate replicas from %s", replicas) + } + + return strings.TrimRight(tokens[1], ui.DeltaSign), nil +} + +func (s *ScaleExtender) replicasFromScaleSubresource(sel string) (string, error) { + res, err := dao.AccessorFor(s.App().factory, s.GVR()) + if err != nil { + return "", err + } + + replicasGetter, ok := res.(dao.ReplicasGetter) + if !ok { + return "", fmt.Errorf("expecting a replicasGetter resource for %q", s.GVR()) + } + + ctx, cancel := context.WithTimeout(context.Background(), s.App().Conn().Config().CallTimeout()) + defer cancel() + + replicas, err := replicasGetter.Replicas(ctx, sel) + if err != nil { + return "", err + } + + return strconv.Itoa(int(replicas)), nil +} + func (s *ScaleExtender) makeScaleForm(sels []string) (*tview.Form, error) { styles := s.App().Styles.Dialog() f := s.makeStyledForm(styles) factor := "0" if len(sels) == 1 { - replicas, err := s.valueOf("READY") - if err != nil { - return nil, err + // If the CRD resource supports scaling, then first try to + // read the replicas directly from the CRD. + if dao.MetaAccess.IsScalable(s.GVR()) { + replicas, err := s.replicasFromScaleSubresource(sels[0]) + if err == nil && len(replicas) != 0 { + factor = replicas + } } - tokens := strings.Split(replicas, "/") - if len(tokens) < 2 { - return nil, fmt.Errorf("unable to locate replicas from %s", replicas) + + // For built-in resources or cases where we can't get the replicas from the CRD, we can + // only try to get the number of copies from the READY field. + if factor == "0" { + replicas, err := s.replicasFromReady(sels[0]) + if err != nil { + return nil, err + } + + factor = replicas } - factor = strings.TrimRight(tokens[1], ui.DeltaSign) } + f.AddInputField("Replicas:", factor, 4, func(textToCheck string, lastChar rune) bool { _, err := strconv.Atoi(textToCheck) return err == nil diff --git a/internal/watch/factory.go b/internal/watch/factory.go index 6c896afa6d..698b17a458 100644 --- a/internal/watch/factory.go +++ b/internal/watch/factory.go @@ -20,8 +20,7 @@ import ( ) const ( - defaultResync = 10 * time.Minute - defaultWaitTime = 250 * time.Millisecond + defaultResync = 10 * time.Minute ) // Factory tracks various resource informers. @@ -47,7 +46,7 @@ func (f *Factory) Start(ns string) { f.mx.Lock() defer f.mx.Unlock() - log.Debug().Msgf("Factory START with ns `%q", ns) + log.Debug().Msgf("Factory START with ns %q", ns) f.stopChan = make(chan struct{}) for ns, fac := range f.factories { log.Debug().Msgf("Starting factory in ns %q", ns) @@ -143,13 +142,8 @@ func (f *Factory) waitForCacheSync(ns string) { return } - // Hang for a sec for the cache to refresh if still not done bail out! - c := make(chan struct{}) - go func(c chan struct{}) { - <-time.After(defaultWaitTime) - close(c) - }(c) - _ = fac.WaitForCacheSync(c) + // we must block until all started informers' caches were synced + _ = fac.WaitForCacheSync(f.stopChan) } // WaitForCacheSync waits for all factories to update their cache.