Skip to content

Commit

Permalink
add more caching to clusterapi provider
Browse files Browse the repository at this point in the history
this change adds logic to create informers for the infrastructure
machine templates that are discovered during the scale from zero checks.
it also adds tests and a slight change to the controller structure to
account for the dynamic informer creation.
  • Loading branch information
elmiko committed Aug 17, 2022
1 parent 1a65fde commit f02c997
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package clusterapi

import (
"context"
"fmt"
"os"
"strings"
Expand Down Expand Up @@ -82,6 +81,10 @@ type machineController struct {
machineDeploymentsAvailable bool
accessLock sync.Mutex
autoDiscoverySpecs []*clusterAPIAutoDiscoveryConfig
// stopChannel is used for running the shared informers, and for starting
// informers associated with infrastructure machine templates that are
// discovered during operation.
stopChannel <-chan struct{}
}

func indexMachineByProviderID(obj interface{}) ([]string, error) {
Expand Down Expand Up @@ -172,9 +175,9 @@ func (c *machineController) findMachineSetOwner(machineSet *unstructured.Unstruc

// run starts shared informers and waits for the informer cache to
// synchronize.
func (c *machineController) run(stopCh <-chan struct{}) error {
c.workloadInformerFactory.Start(stopCh)
c.managementInformerFactory.Start(stopCh)
func (c *machineController) run() error {
c.workloadInformerFactory.Start(c.stopChannel)
c.managementInformerFactory.Start(c.stopChannel)

syncFuncs := []cache.InformerSynced{
c.nodeInformer.HasSynced,
Expand All @@ -186,7 +189,7 @@ func (c *machineController) run(stopCh <-chan struct{}) error {
}

klog.V(4).Infof("waiting for caches to sync")
if !cache.WaitForCacheSync(stopCh, syncFuncs...) {
if !cache.WaitForCacheSync(c.stopChannel, syncFuncs...) {
return fmt.Errorf("syncing caches failed")
}

Expand Down Expand Up @@ -329,6 +332,7 @@ func newMachineController(
managementDiscoveryClient discovery.DiscoveryInterface,
managementScaleClient scale.ScalesGetter,
discoveryOpts cloudprovider.NodeGroupDiscoveryOptions,
stopChannel chan struct{},
) (*machineController, error) {
workloadInformerFactory := kubeinformers.NewSharedInformerFactory(workloadClient, 0)

Expand Down Expand Up @@ -411,6 +415,7 @@ func newMachineController(
machineResource: gvrMachine,
machineDeploymentResource: gvrMachineDeployment,
machineDeploymentsAvailable: machineDeploymentAvailable,
stopChannel: stopChannel,
}, nil
}

Expand Down Expand Up @@ -713,18 +718,27 @@ func (c *machineController) allowedByAutoDiscoverySpecs(r *unstructured.Unstruct

// Get an infrastructure machine template given its GVR, name, and namespace.
func (c *machineController) getInfrastructureResource(resource schema.GroupVersionResource, name string, namespace string) (*unstructured.Unstructured, error) {
infra, err := c.managementClient.
Resource(resource).
Namespace(namespace).
Get(
context.Background(),
name,
metav1.GetOptions{},
)
// get an informer for this type, this will create the informer if it does not exist
informer := c.managementInformerFactory.ForResource(resource)
// since this may be a new informer, we need to restart the informer factory
c.managementInformerFactory.Start(c.stopChannel)
// wait for the informer to sync
klog.V(4).Infof("waiting for cache sync on infrastructure resource")
if !cache.WaitForCacheSync(c.stopChannel, informer.Informer().HasSynced) {
return nil, fmt.Errorf("syncing cache on infrastructure resource failed")
}
// use the informer to get the object we want, this will use the informer cache if possible
obj, err := informer.Lister().ByNamespace(namespace).Get(name)
if err != nil {
klog.V(4).Infof("Unable to read infrastructure reference, error: %v", err)
klog.V(4).Infof("Unable to read infrastructure reference from informer, error: %v", err)
return nil, err
}

infra, ok := obj.(*unstructured.Unstructured)
if !ok {
err := fmt.Errorf("Unable to convert infrastructure reference for %s/%s", namespace, name)
klog.V(4).Infof("%v", err)
return nil, err
}
return infra, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,13 @@ func mustCreateTestController(t *testing.T, testConfigs ...*testConfig) (*machin
}
scaleClient.AddReactor("*", "*", scaleReactor)

controller, err := newMachineController(dynamicClientset, kubeclientSet, discoveryClient, scaleClient, cloudprovider.NodeGroupDiscoveryOptions{})
stopCh := make(chan struct{})
controller, err := newMachineController(dynamicClientset, kubeclientSet, discoveryClient, scaleClient, cloudprovider.NodeGroupDiscoveryOptions{}, stopCh)
if err != nil {
t.Fatal("failed to create test controller")
}

stopCh := make(chan struct{})
if err := controller.run(stopCh); err != nil {
if err := controller.run(); err != nil {
t.Fatalf("failed to run controller: %v", err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,16 @@ func BuildClusterAPI(opts config.AutoscalingOptions, do cloudprovider.NodeGroupD
klog.Fatalf("create scale client failed: %v", err)
}

controller, err := newMachineController(managementClient, workloadClient, managementDiscoveryClient, managementScaleClient, do)
if err != nil {
klog.Fatal(err)
}

// Ideally this would be passed in but the builder is not
// currently organised to do so.
stopCh := make(chan struct{})

if err := controller.run(stopCh); err != nil {
controller, err := newMachineController(managementClient, workloadClient, managementDiscoveryClient, managementScaleClient, do, stopCh)
if err != nil {
klog.Fatal(err)
}

if err := controller.run(); err != nil {
klog.Fatal(err)
}

Expand Down

0 comments on commit f02c997

Please sign in to comment.