diff --git a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup_test.go b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup_test.go index f78ad247370e..53721371581d 100644 --- a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup_test.go +++ b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup_test.go @@ -19,6 +19,7 @@ package clusterapi import ( "context" "fmt" + "k8s.io/client-go/tools/cache" "path" "sort" "strings" @@ -425,42 +426,79 @@ func TestNodeGroupDecreaseTargetSize(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - scalableResource.Spec.Replicas += tc.targetSizeIncrement + ch := make(chan error) + checkDone := func(obj interface{}) (bool, error) { + u, ok := obj.(*unstructured.Unstructured) + if !ok { + return false, nil + } + if u.GetResourceVersion() != scalableResource.GetResourceVersion() { + return false, nil + } + ng, err := newNodeGroupFromScalableResource(controller, u) + if err != nil { + return true, fmt.Errorf("unexpected error: %v", err) + } + if ng == nil { + return false, nil + } + currReplicas, err := ng.TargetSize() + if err != nil { + return true, fmt.Errorf("unexpected error: %v", err) + } - _, err = ng.machineController.managementScaleClient.Scales(ng.scalableResource.Namespace()). - Update(context.TODO(), gvr.GroupResource(), scalableResource, metav1.UpdateOptions{}) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } + if currReplicas != int(tc.initial)+int(tc.targetSizeIncrement) { + return true, fmt.Errorf("expected %v, got %v", tc.initial+tc.targetSizeIncrement, currReplicas) + } - // A nodegroup is immutable; get a fresh copy after adding targetSizeIncrement. - nodegroups, err = controller.nodeGroups() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - ng = nodegroups[0] + if err := ng.DecreaseTargetSize(tc.delta); (err != nil) != tc.expectedError { + return true, fmt.Errorf("expected error: %v, got: %v", tc.expectedError, err) + } - currReplicas, err := ng.TargetSize() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } + scalableResource, err := controller.managementScaleClient.Scales(testConfig.spec.namespace). + Get(context.TODO(), gvr.GroupResource(), ng.scalableResource.Name(), metav1.GetOptions{}) + if err != nil { + return true, fmt.Errorf("unexpected error: %v", err) + } - if currReplicas != int(tc.initial)+int(tc.targetSizeIncrement) { - t.Errorf("initially expected %v, got %v", tc.initial, currReplicas) + if scalableResource.Spec.Replicas != tc.expected { + return true, fmt.Errorf("expected %v, got %v", tc.expected, scalableResource.Spec.Replicas) + } + return true, nil } - - if err := ng.DecreaseTargetSize(tc.delta); (err != nil) != tc.expectedError { - t.Fatalf("expected error: %v, got: %v", tc.expectedError, err) + handler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + match, err := checkDone(obj) + if match { + ch <- err + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + match, err := checkDone(newObj) + if match { + ch <- err + } + }, } + controller.machineSetInformer.Informer().AddEventHandler(handler) + controller.machineDeploymentInformer.Informer().AddEventHandler(handler) - scalableResource, err = controller.managementScaleClient.Scales(testConfig.spec.namespace). - Get(context.TODO(), gvr.GroupResource(), ng.scalableResource.Name(), metav1.GetOptions{}) + scalableResource.Spec.Replicas += tc.targetSizeIncrement + + _, err = ng.machineController.managementScaleClient.Scales(ng.scalableResource.Namespace()). + Update(context.TODO(), gvr.GroupResource(), scalableResource, metav1.UpdateOptions{}) if err != nil { t.Fatalf("unexpected error: %v", err) } - if scalableResource.Spec.Replicas != tc.expected { - t.Errorf("expected %v, got %v", tc.expected, scalableResource.Spec.Replicas) + lastErr := fmt.Errorf("no updates received yet") + for lastErr != nil { + select { + case err = <-ch: + lastErr = err + case <-time.After(1 * time.Second): + t.Fatal(fmt.Errorf("timeout while waiting for update. Last error was: %v", lastErr)) + } } } diff --git a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_unstructured.go b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_unstructured.go index ad528ea907db..c9b88854d385 100644 --- a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_unstructured.go +++ b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_unstructured.go @@ -80,19 +80,14 @@ func (r unstructuredScalableResource) ProviderIDs() ([]string, error) { } func (r unstructuredScalableResource) Replicas() (int, error) { - gvr, err := r.GroupVersionResource() + replicas, found, err := unstructured.NestedInt64(r.unstructured.UnstructuredContent(), "spec", "replicas") if err != nil { - return 0, err + return 0, errors.Wrap(err, "error getting replica count") } - - s, err := r.controller.managementScaleClient.Scales(r.Namespace()).Get(context.TODO(), gvr.GroupResource(), r.Name(), metav1.GetOptions{}) - if err != nil { - return 0, err + if !found { + replicas = 0 } - if s == nil { - return 0, fmt.Errorf("unknown %s %s/%s", r.Kind(), r.Namespace(), r.Name()) - } - return int(s.Spec.Replicas), nil + return int(replicas), nil } func (r unstructuredScalableResource) SetSize(nreplicas int) error { @@ -119,6 +114,11 @@ func (r unstructuredScalableResource) SetSize(nreplicas int) error { s.Spec.Replicas = int32(nreplicas) _, updateErr := r.controller.managementScaleClient.Scales(r.Namespace()).Update(context.TODO(), gvr.GroupResource(), s, metav1.UpdateOptions{}) + + if updateErr == nil { + updateErr = unstructured.SetNestedField(r.unstructured.UnstructuredContent(), int64(nreplicas), "spec", "replicas") + } + return updateErr } diff --git a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_unstructured_test.go b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_unstructured_test.go index cc0bb23f84fd..be51c9350f77 100644 --- a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_unstructured_test.go +++ b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_unstructured_test.go @@ -18,9 +18,12 @@ package clusterapi import ( "context" - "testing" - + "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/tools/cache" + "testing" + "time" ) func TestSetSize(t *testing.T) { @@ -125,19 +128,57 @@ func TestReplicas(t *testing.T) { s.Spec.Replicas = int32(updatedReplicas) - _, err = sr.controller.managementScaleClient.Scales(testResource.GetNamespace()). - Update(context.TODO(), gvr.GroupResource(), s, metav1.UpdateOptions{}) - if err != nil { - t.Fatal(err) + ch := make(chan error) + checkDone := func(obj interface{}) (bool, error) { + u, ok := obj.(*unstructured.Unstructured) + if !ok { + return false, nil + } + sr, err := newUnstructuredScalableResource(controller, u) + if err != nil { + return true, err + } + i, err := sr.Replicas() + if err != nil { + return true, err + } + if i != updatedReplicas { + return true, fmt.Errorf("expected %v, got: %v", updatedReplicas, i) + } + return true, nil + } + handler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + match, err := checkDone(obj) + if match { + ch <- err + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + match, err := checkDone(newObj) + if match { + ch <- err + } + }, } - i, err = sr.Replicas() + controller.machineSetInformer.Informer().AddEventHandler(handler) + controller.machineDeploymentInformer.Informer().AddEventHandler(handler) + + _, err = sr.controller.managementScaleClient.Scales(testResource.GetNamespace()). + Update(context.TODO(), gvr.GroupResource(), s, metav1.UpdateOptions{}) if err != nil { t.Fatal(err) } - if i != updatedReplicas { - t.Errorf("expected %v, got: %v", updatedReplicas, i) + lastErr := fmt.Errorf("no updates received yet") + for lastErr != nil { + select { + case err = <-ch: + lastErr = err + case <-time.After(1 * time.Second): + t.Fatal(fmt.Errorf("timeout while waiting for update. Last error was: %v", lastErr)) + } } }