diff --git a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_controller.go b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_controller.go index 1004aa0bde94..0f0004e447be 100644 --- a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_controller.go +++ b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_controller.go @@ -17,7 +17,6 @@ limitations under the License. package clusterapi import ( - "context" "fmt" "os" "strings" @@ -82,6 +81,10 @@ type machineController struct { machineDeploymentsAvailable bool accessLock sync.Mutex autoDiscoverySpecs []*clusterAPIAutoDiscoveryConfig + // stopChannel is used for running the shared informers, and for starting + // informers associated with infrastructure machine templates that are + // discovered during operation. + stopChannel <-chan struct{} } func indexMachineByProviderID(obj interface{}) ([]string, error) { @@ -172,9 +175,9 @@ func (c *machineController) findMachineSetOwner(machineSet *unstructured.Unstruc // run starts shared informers and waits for the informer cache to // synchronize. -func (c *machineController) run(stopCh <-chan struct{}) error { - c.workloadInformerFactory.Start(stopCh) - c.managementInformerFactory.Start(stopCh) +func (c *machineController) run() error { + c.workloadInformerFactory.Start(c.stopChannel) + c.managementInformerFactory.Start(c.stopChannel) syncFuncs := []cache.InformerSynced{ c.nodeInformer.HasSynced, @@ -186,7 +189,7 @@ func (c *machineController) run(stopCh <-chan struct{}) error { } klog.V(4).Infof("waiting for caches to sync") - if !cache.WaitForCacheSync(stopCh, syncFuncs...) { + if !cache.WaitForCacheSync(c.stopChannel, syncFuncs...) { return fmt.Errorf("syncing caches failed") } @@ -329,6 +332,7 @@ func newMachineController( managementDiscoveryClient discovery.DiscoveryInterface, managementScaleClient scale.ScalesGetter, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, + stopChannel chan struct{}, ) (*machineController, error) { workloadInformerFactory := kubeinformers.NewSharedInformerFactory(workloadClient, 0) @@ -411,6 +415,7 @@ func newMachineController( machineResource: gvrMachine, machineDeploymentResource: gvrMachineDeployment, machineDeploymentsAvailable: machineDeploymentAvailable, + stopChannel: stopChannel, }, nil } @@ -713,18 +718,27 @@ func (c *machineController) allowedByAutoDiscoverySpecs(r *unstructured.Unstruct // Get an infrastructure machine template given its GVR, name, and namespace. func (c *machineController) getInfrastructureResource(resource schema.GroupVersionResource, name string, namespace string) (*unstructured.Unstructured, error) { - infra, err := c.managementClient. - Resource(resource). - Namespace(namespace). - Get( - context.Background(), - name, - metav1.GetOptions{}, - ) + // get an informer for this type, this will create the informer if it does not exist + informer := c.managementInformerFactory.ForResource(resource) + // since this may be a new informer, we need to restart the informer factory + c.managementInformerFactory.Start(c.stopChannel) + // wait for the informer to sync + klog.V(4).Infof("waiting for cache sync on infrastructure resource") + if !cache.WaitForCacheSync(c.stopChannel, informer.Informer().HasSynced) { + return nil, fmt.Errorf("syncing cache on infrastructure resource failed") + } + // use the informer to get the object we want, this will use the informer cache if possible + obj, err := informer.Lister().ByNamespace(namespace).Get(name) if err != nil { - klog.V(4).Infof("Unable to read infrastructure reference, error: %v", err) + klog.V(4).Infof("Unable to read infrastructure reference from informer, error: %v", err) return nil, err } + infra, ok := obj.(*unstructured.Unstructured) + if !ok { + err := fmt.Errorf("Unable to convert infrastructure reference for %s/%s", namespace, name) + klog.V(4).Infof("%v", err) + return nil, err + } return infra, err } diff --git a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_controller_test.go b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_controller_test.go index f54ca2d0c81d..4db9b31c725f 100644 --- a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_controller_test.go +++ b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_controller_test.go @@ -250,13 +250,13 @@ func mustCreateTestController(t *testing.T, testConfigs ...*testConfig) (*machin } scaleClient.AddReactor("*", "*", scaleReactor) - controller, err := newMachineController(dynamicClientset, kubeclientSet, discoveryClient, scaleClient, cloudprovider.NodeGroupDiscoveryOptions{}) + stopCh := make(chan struct{}) + controller, err := newMachineController(dynamicClientset, kubeclientSet, discoveryClient, scaleClient, cloudprovider.NodeGroupDiscoveryOptions{}, stopCh) if err != nil { t.Fatal("failed to create test controller") } - stopCh := make(chan struct{}) - if err := controller.run(stopCh); err != nil { + if err := controller.run(); err != nil { t.Fatalf("failed to run controller: %v", err) } diff --git a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_provider.go b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_provider.go index 324aeea5bd22..290be55f5384 100644 --- a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_provider.go +++ b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_provider.go @@ -180,16 +180,16 @@ func BuildClusterAPI(opts config.AutoscalingOptions, do cloudprovider.NodeGroupD klog.Fatalf("create scale client failed: %v", err) } - controller, err := newMachineController(managementClient, workloadClient, managementDiscoveryClient, managementScaleClient, do) - if err != nil { - klog.Fatal(err) - } - // Ideally this would be passed in but the builder is not // currently organised to do so. stopCh := make(chan struct{}) - if err := controller.run(stopCh); err != nil { + controller, err := newMachineController(managementClient, workloadClient, managementDiscoveryClient, managementScaleClient, do, stopCh) + if err != nil { + klog.Fatal(err) + } + + if err := controller.run(); err != nil { klog.Fatal(err) }