Skip to content

Commit

Permalink
allow scaling custom resource
Browse files Browse the repository at this point in the history
  • Loading branch information
wjiec committed Dec 1, 2024
1 parent c07ea1d commit ac11361
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 23 deletions.
26 changes: 21 additions & 5 deletions internal/dao/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/derailed/k9s/internal/client"
"github.com/rs/zerolog/log"
apiext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -20,10 +19,11 @@ import (
)

const (
crdCat = "crd"
k9sCat = "k9s"
helmCat = "helm"
crdGVR = "apiextensions.k8s.io/v1/customresourcedefinitions"
crdCat = "crd"
k9sCat = "k9s"
helmCat = "helm"
scaleCat = "scale"
crdGVR = "apiextensions.k8s.io/v1/customresourcedefinitions"
)

// MetaAccess tracks resources metadata.
Expand Down Expand Up @@ -96,6 +96,9 @@ func AccessorFor(f Factory, gvr client.GVR) (Accessor, error) {
r, ok := m[gvr]
if !ok {
r = new(Generic)
if MetaAccess.IsScalable(gvr) {
r = new(Scaler)
}
log.Debug().Msgf("No DAO registry entry for %q. Using generics!", gvr)
}
r.Init(f, gvr)
Expand Down Expand Up @@ -198,6 +201,19 @@ func (m *Meta) LoadResources(f Factory) error {
return nil
}

// IsScalable check if the resource can be scaled
func (m *Meta) IsScalable(gvr client.GVR) bool {
if meta, ok := m.resMetas[gvr]; ok {
for _, c := range meta.Categories {
if c == scaleCat {
return true
}
}
}

return false
}

// BOZO!! Need countermeasures for direct commands!
func loadNonResource(m ResourceMetas) {
loadK9s(m)
Expand Down
77 changes: 77 additions & 0 deletions internal/dao/scalable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package dao

import (
"context"

"github.com/derailed/k9s/internal/client"
"github.com/rs/zerolog/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/scale"
)

var _ Scalable = (*Scaler)(nil)
var _ ReplicasGetter = (*Scaler)(nil)

// Scaler represents a generic resource with scaling.
type Scaler struct {
Generic
}

// Replicas returns the number of replicas for the resource located at the given path.
func (s *Scaler) Replicas(ctx context.Context, path string) (int32, error) {
scaleClient, err := s.scaleClient()
if err != nil {
return 0, err
}

ns, name := client.Namespaced(path)
currScale, err := scaleClient.Scales(ns).Get(ctx, *s.gvr.GR(), name, metav1.GetOptions{})
if err != nil {
return 0, err
}

return currScale.Spec.Replicas, nil
}

// Scale modifies the number of replicas for a given resource specified by the path.
func (s *Scaler) Scale(ctx context.Context, path string, replicas int32) error {
ns, name := client.Namespaced(path)

scaleClient, err := s.scaleClient()
if err != nil {
return err
}

currentScale, err := scaleClient.Scales(ns).Get(ctx, *s.gvr.GR(), name, metav1.GetOptions{})
if err != nil {
return err
}

currentScale.Spec.Replicas = replicas
updatedScale, err := scaleClient.Scales(ns).Update(ctx, *s.gvr.GR(), currentScale, metav1.UpdateOptions{})
if err != nil {
return err
}

log.Debug().Msgf("%s scaled to %d", path, updatedScale.Spec.Replicas)
return nil
}

func (s *Scaler) scaleClient() (scale.ScalesGetter, error) {
cfg, err := s.Client().RestConfig()
if err != nil {
return nil, err
}

discoveryClient, err := s.Client().CachedDiscovery()
if err != nil {
return nil, err
}

mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(discoveryClient)

return scale.NewForConfig(cfg, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
}
6 changes: 6 additions & 0 deletions internal/dao/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ type Scalable interface {
Scale(ctx context.Context, path string, replicas int32) error
}

// ReplicasGetter represents a resource with replicas.
type ReplicasGetter interface {
// Replicas returns the number of replicas for the resource located at the given path.
Replicas(ctx context.Context, path string) (int32, error)
}

// Controller represents a pod controller.
type Controller interface {
// Pod returns a pod instance matching the selector.
Expand Down
7 changes: 6 additions & 1 deletion internal/view/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,12 @@ func (c *Command) viewMetaFor(p *cmd.Interpreter) (client.GVR, *MetaViewer, erro

v := MetaViewer{
viewerFn: func(gvr client.GVR) ResourceViewer {
return NewOwnerExtender(NewBrowser(gvr))
viewer := NewOwnerExtender(NewBrowser(gvr))
if dao.MetaAccess.IsScalable(gvr) {
viewer = NewScaleExtender(viewer)
}

return viewer
},
}
if mv, ok := customViewers[gvr]; ok {
Expand Down
61 changes: 54 additions & 7 deletions internal/view/scale_extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,69 @@ func (s *ScaleExtender) valueOf(col string) (string, error) {
return s.GetTable().GetSelectedCell(colIdx), nil
}

func (s *ScaleExtender) replicasFromReady(_ string) (string, error) {
replicas, err := s.valueOf("READY")
if err != nil {
return "", err
}

tokens := strings.Split(replicas, "/")
if len(tokens) < 2 {
return "", fmt.Errorf("unable to locate replicas from %s", replicas)
}

return strings.TrimRight(tokens[1], ui.DeltaSign), nil
}

func (s *ScaleExtender) replicasFromScaleSubresource(sel string) (string, error) {
res, err := dao.AccessorFor(s.App().factory, s.GVR())
if err != nil {
return "", err
}

replicasGetter, ok := res.(dao.ReplicasGetter)
if !ok {
return "", fmt.Errorf("expecting a replicasGetter resource for %q", s.GVR())
}

ctx, cancel := context.WithTimeout(context.Background(), s.App().Conn().Config().CallTimeout())
defer cancel()

replicas, err := replicasGetter.Replicas(ctx, sel)
if err != nil {
return "", err
}

return strconv.Itoa(int(replicas)), nil
}

func (s *ScaleExtender) makeScaleForm(sels []string) (*tview.Form, error) {
styles := s.App().Styles.Dialog()
f := s.makeStyledForm(styles)

factor := "0"
if len(sels) == 1 {
replicas, err := s.valueOf("READY")
if err != nil {
return nil, err
// If the CRD resource supports scaling, then first try to
// read the replicas directly from the CRD.
if dao.MetaAccess.IsScalable(s.GVR()) {
replicas, err := s.replicasFromScaleSubresource(sels[0])
if err == nil && len(replicas) != 0 {
factor = replicas
}
}
tokens := strings.Split(replicas, "/")
if len(tokens) < 2 {
return nil, fmt.Errorf("unable to locate replicas from %s", replicas)

// For built-in resources or cases where we can't get the replicas from the CRD, we can
// only try to get the number of copies from the READY field.
if factor == "0" {
replicas, err := s.replicasFromReady(sels[0])
if err != nil {
return nil, err
}

factor = replicas
}
factor = strings.TrimRight(tokens[1], ui.DeltaSign)
}

f.AddInputField("Replicas:", factor, 4, func(textToCheck string, lastChar rune) bool {
_, err := strconv.Atoi(textToCheck)
return err == nil
Expand Down
14 changes: 4 additions & 10 deletions internal/watch/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
)

const (
defaultResync = 10 * time.Minute
defaultWaitTime = 250 * time.Millisecond
defaultResync = 10 * time.Minute
)

// Factory tracks various resource informers.
Expand All @@ -47,7 +46,7 @@ func (f *Factory) Start(ns string) {
f.mx.Lock()
defer f.mx.Unlock()

log.Debug().Msgf("Factory START with ns `%q", ns)
log.Debug().Msgf("Factory START with ns %q", ns)
f.stopChan = make(chan struct{})
for ns, fac := range f.factories {
log.Debug().Msgf("Starting factory in ns %q", ns)
Expand Down Expand Up @@ -143,13 +142,8 @@ func (f *Factory) waitForCacheSync(ns string) {
return
}

// Hang for a sec for the cache to refresh if still not done bail out!
c := make(chan struct{})
go func(c chan struct{}) {
<-time.After(defaultWaitTime)
close(c)
}(c)
_ = fac.WaitForCacheSync(c)
// we must block until all started informers' caches were synced
_ = fac.WaitForCacheSync(f.stopChan)
}

// WaitForCacheSync waits for all factories to update their cache.
Expand Down

0 comments on commit ac11361

Please sign in to comment.