From 7871f5871f30b66340a9a490377133335107a04c Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 31 May 2019 01:50:11 +0200 Subject: [PATCH 1/4] Always create missing statefulsets Add tests for creating missing stateful sets Rename test refactor test --- pkg/controller/add_cluster_test.go | 5 +- pkg/controller/common_test.go | 14 +- pkg/controller/controller.go | 119 +++++++++-------- pkg/controller/controller_test.go | 177 +++++++++++++++++++++++++- pkg/controller/update_cluster_test.go | 51 ++++---- 5 files changed, 277 insertions(+), 89 deletions(-) diff --git a/pkg/controller/add_cluster_test.go b/pkg/controller/add_cluster_test.go index f896fc07..7a835d4b 100644 --- a/pkg/controller/add_cluster_test.go +++ b/pkg/controller/add_cluster_test.go @@ -76,11 +76,11 @@ func TestEnsureService_Base(t *testing.T) { func TestEnsureConfigMap(t *testing.T) { cluster := getFixture("cluster-simple.yaml", t) deps := newTestDeps(t, &testOpts{}) + controller := deps.newController(t) defer deps.cleanup() require.NoError(t, registerValidConfigMap("my_config_data")) - controller := deps.newController() err := controller.ensureConfigMap(cluster) assert.NoError(t, err) @@ -99,12 +99,11 @@ func TestEnsureConfigMap(t *testing.T) { func TestEnsureConfigMap_Update(t *testing.T) { cluster := getFixture("cluster-simple.yaml", t) deps := newTestDeps(t, &testOpts{}) + controller := deps.newController(t) defer deps.cleanup() require.NoError(t, registerValidConfigMap("my_config_data")) - controller := deps.newController() - err := controller.ensureConfigMap(cluster) assert.NoError(t, err) diff --git a/pkg/controller/common_test.go b/pkg/controller/common_test.go index d982e1df..5f4f76d9 100644 --- a/pkg/controller/common_test.go +++ b/pkg/controller/common_test.go @@ -28,6 +28,7 @@ import ( crdfake "github.com/m3db/m3db-operator/pkg/client/clientset/versioned/fake" crdinformers "github.com/m3db/m3db-operator/pkg/client/informers/externalversions" crdlisters "github.com/m3db/m3db-operator/pkg/client/listers/m3dboperator/v1alpha1" + "github.com/m3db/m3db-operator/pkg/k8sops" "github.com/m3db/m3db-operator/pkg/k8sops/podidentity" "github.com/m3db/m3db-operator/pkg/m3admin/namespace" "github.com/m3db/m3db-operator/pkg/m3admin/placement" @@ -70,7 +71,8 @@ type testDeps struct { closed int32 } -func (deps *testDeps) newController() *Controller { +func (deps *testDeps) newController(t *testing.T) *Controller { + logger := zap.NewNop() m := newMultiAdminClient(nil, zap.NewNop()) m.nsClientFn = func(...namespace.Option) (namespace.Client, error) { return deps.namespaceClient, nil @@ -78,12 +80,20 @@ func (deps *testDeps) newController() *Controller { m.plClientFn = func(...placement.Option) (placement.Client, error) { return deps.placementClient, nil } + k8sopsClient, err := k8sops.New( + k8sops.WithKClient(deps.kubeClient), + k8sops.WithCRDClient(deps.crdClient), + k8sops.WithLogger(logger)) + + require.NoError(t, err) + return &Controller{ - logger: zap.NewNop(), + logger: logger, scope: tally.NoopScope, clock: deps.clock, adminClient: m, + k8sclient: k8sopsClient, kubeClient: deps.kubeClient, crdClient: deps.crdClient, podIDProvider: deps.idProvider, diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index d3a342fa..9e44bf43 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -71,6 +71,14 @@ var ( errNonUniqueIsoGroups = errors.New("isolation group names are not unique") ) +type handleClusterUpdateResult uint + +const ( + noopClusterUpdate handleClusterUpdateResult = iota + waitClusterUpdate + actionClusterUpdate +) + // Configuration contains parameters for the controller. type Configuration struct { // ManageCRD indicates whether the controller should create and update specs @@ -326,12 +334,15 @@ func (c *Controller) handleClusterEvent(key string) error { return errors.New("got nil cluster for " + key) } - return c.handleClusterUpdate(cluster) + _, err = c.handleClusterUpdate(cluster) + return err } // We are guaranteed by handleClusterEvent that we will never be passed a nil // cluster here. -func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error { +func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) (handleClusterUpdateResult, error) { + var result handleClusterUpdateResult + // MUST create a deep copy of the cluster or risk corrupting cache! Technically // only need if we modify, but we frequently do that so let's deep copy to // start and remove unnecessary calls later to optimize if we want. @@ -347,7 +358,7 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error { if dts := cluster.ObjectMeta.DeletionTimestamp; dts != nil && !dts.IsZero() { if !stringArrayContains(cluster.Finalizers, labels.EtcdDeletionFinalizer) { clusterLogger.Info("no etcd finalizer on cluster, nothing to do") - return nil + return result, nil } // If cluster is set to preserve data, jump straight to removing the @@ -357,54 +368,54 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error { } else { if err := c.deleteAllNamespaces(cluster); err != nil { clusterLogger.Error("error deleting cluster namespaces", zap.Error(err)) - return err + return result, err } if err := c.deletePlacement(cluster); err != nil { clusterLogger.Error("error deleting cluster placement", zap.Error(err)) - return err + return result, err } } if _, err := c.removeEtcdFinalizer(cluster); err != nil { clusterLogger.Error("error deleting etcd finalizer", zap.Error(err)) - return pkgerrors.WithMessage(err, "error removing etcd cluster finalizer") + return result, pkgerrors.WithMessage(err, "error removing etcd cluster finalizer") } // Exit the control loop once the cluster is deleted and cleaned up. clusterLogger.Info("completed finalizer cleanup") - return nil + return result, nil } if err := validateIsolationGroups(cluster); err != nil { clusterLogger.Error("failed validating isolationgroups", zap.Error(err)) c.recorder.WarningEvent(cluster, eventer.ReasonFailSync, err.Error()) - return err + return result, err } if !cluster.Spec.KeepEtcdDataOnDelete { var err error cluster, err = c.ensureEtcdFinalizer(cluster) if err != nil { - return err + return result, err } } if err := c.ensureConfigMap(cluster); err != nil { clusterLogger.Error("failed to ensure configmap", zap.Error(err)) c.recorder.WarningEvent(cluster, eventer.ReasonFailSync, "failed to ensure configmap: %s", err.Error()) - return err + return result, err } // Per https://v1-10.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.10/#statefulsetspec-v1-apps, // headless service MUST exist before statefulset. if err := c.ensureServices(cluster); err != nil { - return err + return result, err } if len(cluster.Spec.IsolationGroups) == 0 { // nothing to do, no groups to create in - return nil + return result, nil } // copy since we sort the array @@ -414,45 +425,47 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error { childrenSets, err := c.getChildStatefulSets(cluster) if err != nil { - return err + return result, err } + childrenSetsByName := make(map[string]*appsv1.StatefulSet) for _, sts := range childrenSets { + childrenSetsByName[sts.Name] = sts // if any of the statefulsets aren't ready, wait until they are as we'll get // another event (ready == bootstrapped) if sts.Spec.Replicas != nil && *sts.Spec.Replicas != sts.Status.ReadyReplicas { // TODO(schallert): figure out what to do if replicas is not set c.logger.Info("waiting for statefulset to be ready", zap.String("name", sts.Name), zap.Int32("ready", sts.Status.ReadyReplicas)) - return nil + result = waitClusterUpdate + return result, nil } } - // At this point all existing statefulsets are bootstrapped. - - if len(childrenSets) != len(isoGroups) { - nextID := len(childrenSets) - // create a statefulset + // Create any missing statefulsets, at this point all existing stateful sets are bootstrapped. + for i := 0; i < len(isoGroups); i++ { + name := k8sops.StatefulSetName(cluster.Name, i) + _, exists := childrenSetsByName[name] + if !exists { + sts, err := k8sops.GenerateStatefulSet(cluster, isoGroups[i].Name, isoGroups[i].NumInstances) + if err != nil { + return result, err + } - name := fmt.Sprintf("%s-%d", cluster.Name, nextID) - sts, err := k8sops.GenerateStatefulSet(cluster, isoGroups[nextID].Name, isoGroups[nextID].NumInstances) - if err != nil { - return err - } + _, err = c.kubeClient.AppsV1().StatefulSets(cluster.Namespace).Create(sts) + if err != nil { + c.logger.Error(err.Error()) + return result, err + } - _, err = c.kubeClient.AppsV1().StatefulSets(cluster.Namespace).Create(sts) - if err != nil { - c.logger.Error(err.Error()) - return err + c.logger.Info("created statefulset", zap.String("name", name)) + return actionClusterUpdate, nil } - - c.logger.Info("created statefulset", zap.String("name", name)) - return nil } if err := c.reconcileNamespaces(cluster); err != nil { c.recorder.WarningEvent(cluster, eventer.ReasonFailedCreate, "failed to create namespace: %s", err) c.logger.Error("error reconciling namespaces", zap.Error(err)) - return err + return result, err } if len(cluster.Spec.Namespaces) == 0 { @@ -463,7 +476,7 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error { if !cluster.Status.HasInitializedPlacement() { cluster, err = c.validatePlacementWithStatus(cluster) if err != nil { - return err + return result, err } } @@ -473,12 +486,12 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error { selector := klabels.SelectorFromSet(labels.BaseLabels(cluster)) pods, err := c.podLister.Pods(cluster.Namespace).List(selector) if err != nil { - return fmt.Errorf("error listing pods: %v", err) + return result, fmt.Errorf("error listing pods: %v", err) } placement, err := c.adminClient.placementClientForCluster(cluster).Get() if err != nil { - return fmt.Errorf("error fetching active placement: %v", err) + return result, fmt.Errorf("error fetching active placement: %v", err) } c.logger.Info("found placement", zap.Int("currentPods", len(pods)), zap.Int("placementInsts", placement.NumInstances())) @@ -488,33 +501,33 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error { if !inst.IsAvailable() { unavailInsts = append(unavailInsts, inst.ID()) } - } if ln := len(unavailInsts); ln > 0 { c.logger.Warn("waiting for instances to be available", zap.Strings("instances", unavailInsts)) c.recorder.WarningEvent(cluster, eventer.ReasonLongerThanUsual, "current unavailable instances: %d", ln) - return nil + result = waitClusterUpdate + return result, nil } // Determine if any sets aren't at their desired replica count. Maybe we can // reuse the set objects from above but being paranoid for now. childrenSets, err = c.getChildStatefulSets(cluster) if err != nil { - return err + return result, nil } // check if any pods inside the cluster need to be swapped in leavingInstanceID, podToReplace, err := c.checkPodsForReplacement(cluster, pods, placement) if err != nil { - return err + return result, nil } if podToReplace != nil { err = c.replacePodInPlacement(cluster, placement, leavingInstanceID, podToReplace) if err != nil { c.recorder.WarningEvent(cluster, eventer.ReasonFailedToUpdate, "could not replace instance: "+leavingInstanceID) - return err + return result, err } c.recorder.NormalEvent(cluster, eventer.ReasonSuccessfulUpdate, "successfully replaced instance: "+leavingInstanceID) } @@ -522,16 +535,16 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error { for _, set := range childrenSets { zone, ok := set.Labels[labels.IsolationGroup] if !ok { - return fmt.Errorf("statefulset %s has no isolation-group label", set.Name) + return result, fmt.Errorf("statefulset %s has no isolation-group label", set.Name) } group, ok := myspec.IsolationGroups(isoGroups).GetByName(zone) if !ok { - return fmt.Errorf("zone %s not found in cluster isoGroups %v", zone, isoGroups) + return result, fmt.Errorf("zone %s not found in cluster isoGroups %v", zone, isoGroups) } if set.Spec.Replicas == nil { - return fmt.Errorf("set %s has unset spec replica", set.Name) + return result, fmt.Errorf("set %s has unset spec replica", set.Name) } // Number of pods we want in the group. @@ -559,7 +572,10 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error { // absent from the placement, add pods to placement. if inPlacement < current { setLogger.Info("expanding placement for set") - return c.expandPlacementForSet(cluster, set, group, placement) + if err := c.expandPlacementForSet(cluster, set, group, placement); err != nil { + return result, err + } + return actionClusterUpdate, nil } } @@ -567,7 +583,10 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error { // trigger a remove so that we can shrink the set. if inPlacement > desired { setLogger.Info("remove instance from placement for set") - return c.shrinkPlacementForSet(cluster, set, placement) + if err := c.shrinkPlacementForSet(cluster, set, placement); err != nil { + return result, err + } + return actionClusterUpdate, nil } var newCount int32 @@ -581,15 +600,15 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error { set.Spec.Replicas = pointer.Int32Ptr(newCount) set, err = c.kubeClient.AppsV1().StatefulSets(set.Namespace).Update(set) if err != nil { - return fmt.Errorf("error updating statefulset %s: %v", set.Name, err) + return result, fmt.Errorf("error updating statefulset %s: %v", set.Name, err) } - return nil + return actionClusterUpdate, nil } placement, err = c.adminClient.placementClientForCluster(cluster).Get() if err != nil { - return fmt.Errorf("error fetching placement: %v", err) + return result, fmt.Errorf("error fetching placement: %v", err) } // TODO(celina): possibly do a replacement check here @@ -597,7 +616,7 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error { // See if we need to clean up the pod bootstrapping status. cluster, err = c.reconcileBootstrappingStatus(cluster, placement) if err != nil { - return fmt.Errorf("error reconciling bootstrap status: %v", err) + return result, fmt.Errorf("error reconciling bootstrap status: %v", err) } c.logger.Info("nothing to do", @@ -606,7 +625,7 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error { zap.Int64("generation", cluster.ObjectMeta.Generation), zap.String("rv", cluster.ObjectMeta.ResourceVersion)) - return nil + return noopClusterUpdate, nil } func instancesInIsoGroup(pl m3placement.Placement, isoGroup string) []m3placement.Instance { diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 8f5d5b8a..c1b3038e 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -21,6 +21,7 @@ package controller import ( + "fmt" "sort" "testing" "time" @@ -29,6 +30,7 @@ import ( clientsetfake "github.com/m3db/m3db-operator/pkg/client/clientset/versioned/fake" m3dbinformers "github.com/m3db/m3db-operator/pkg/client/informers/externalversions" "github.com/m3db/m3db-operator/pkg/k8sops" + "github.com/m3db/m3db-operator/pkg/k8sops/labels" "github.com/m3db/m3db-operator/pkg/k8sops/podidentity" "github.com/m3db/m3/src/cluster/placement" @@ -41,6 +43,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" kubeinformers "k8s.io/client-go/informers" kubefake "k8s.io/client-go/kubernetes/fake" + kubetesting "k8s.io/client-go/testing" "github.com/golang/mock/gomock" pkgerrors "github.com/pkg/errors" @@ -139,7 +142,7 @@ func TestGetChildStatefulSets(t *testing.T) { kubeObjects: objects, }) - c := deps.newController() + c := deps.newController(t) // Ensure that with no owner references we don't act on these sets children, err := c.getChildStatefulSets(cluster) @@ -237,10 +240,9 @@ func TestGetParentCluster(t *testing.T) { cluster, }, }) + c := deps.newController(t) defer deps.cleanup() - c := deps.newController() - pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: "namespace", @@ -277,8 +279,8 @@ func TestHandlePodUpdate(t *testing.T) { crdObjects: []runtime.Object{cluster}, kubeObjects: []runtime.Object{pod1}, }) + c := deps.newController(t) defer deps.cleanup() - c := deps.newController() mockID := &myspec.PodIdentity{ Name: "pod1", @@ -302,7 +304,8 @@ func TestHandlePodUpdate(t *testing.T) { func TestClusterEventLoop(t *testing.T) { deps := newTestDeps(t, &testOpts{}) defer deps.cleanup() - c := deps.newController() + + c := deps.newController(t) cluster := &myspec.M3DBCluster{ ObjectMeta: newObjectMeta("foo", nil), @@ -335,7 +338,8 @@ func TestClusterEventLoop(t *testing.T) { func TestPodEventLoop(t *testing.T) { deps := newTestDeps(t, &testOpts{}) defer deps.cleanup() - c := deps.newController() + + c := deps.newController(t) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -415,3 +419,164 @@ func TestValidateIsolationGroups(t *testing.T) { } } } + +func TestHandleUpdateClusterCreatesStatefulSets(t *testing.T) { + tests := []struct { + name string + cluster *metav1.ObjectMeta + sets []*metav1.ObjectMeta + replicationFactor int + expCreateStatefulSets []string + }{ + { + name: "creates missing stateful sets at tail", + cluster: newMeta("cluster1", map[string]string{"foo": "bar"}), + sets: []*metav1.ObjectMeta{ + newMeta("cluster1-rep0", nil), + }, + replicationFactor: 3, + expCreateStatefulSets: []string{"cluster1-rep1", "cluster1-rep2"}, + }, + { + name: "creates missing stateful sets at head and tail", + cluster: newMeta("cluster1", map[string]string{"foo": "bar"}), + sets: []*metav1.ObjectMeta{ + newMeta("cluster1-rep1", nil), + }, + replicationFactor: 3, + expCreateStatefulSets: []string{"cluster1-rep0", "cluster1-rep2"}, + }, + { + name: "creates missing stateful sets at head", + cluster: newMeta("cluster1", map[string]string{"foo": "bar"}), + sets: []*metav1.ObjectMeta{ + newMeta("cluster1-rep2", nil), + }, + replicationFactor: 3, + expCreateStatefulSets: []string{"cluster1-rep0", "cluster1-rep1"}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + cfgMapName := "configMapName" + cluster := &myspec.M3DBCluster{ + ObjectMeta: *test.cluster, + Spec: myspec.ClusterSpec{ + ReplicationFactor: int32(test.replicationFactor), + ConfigMapName: &cfgMapName, + }, + } + cluster.ObjectMeta.UID = "abcd" + for i := 0; i < test.replicationFactor; i++ { + group := myspec.IsolationGroup{ + Name: fmt.Sprintf("group%d", i), + NumInstances: 1, + } + cluster.Spec.IsolationGroups = append(cluster.Spec.IsolationGroups, group) + } + + objects := make([]runtime.Object, len(test.sets)) + statefulSets := make([]*appsv1.StatefulSet, len(test.sets)) + for i, s := range test.sets { + set := &appsv1.StatefulSet{ + ObjectMeta: *s, + } + // Apply base labels + if set.ObjectMeta.Labels == nil { + set.ObjectMeta.Labels = make(map[string]string) + } + for k, v := range labels.BaseLabels(cluster) { + set.ObjectMeta.Labels[k] = v + } + statefulSets[i] = set + objects[i] = set + } + + deps := newTestDeps(t, &testOpts{ + kubeObjects: objects, + }) + defer deps.cleanup() + + for _, set := range statefulSets { + // Now set the owner refs and ensure we pick up the sets + set.SetOwnerReferences([]metav1.OwnerReference{ + *metav1.NewControllerRef(cluster, schema.GroupVersionKind{ + Group: myspec.SchemeGroupVersion.Group, + Version: myspec.SchemeGroupVersion.Version, + Kind: "m3dbcluster", + }), + }) + // Also set the number of ready replicas to as expected + replicas := int32(1) + set.Spec.Replicas = &replicas + set.Status.ReadyReplicas = replicas + _, err := deps.kubeClient.AppsV1().StatefulSets("namespace").Update(set) + assert.NoError(t, err) + } + + c := deps.newController(t) + + // Keep running cluster updates until no more stateful sets required + expectedSetsCreated := make(map[string]bool) + for _, name := range test.expCreateStatefulSets { + expectedSetsCreated[name] = false + } + for { + created := 0 + for _, v := range expectedSetsCreated { + if v { + created++ + } + } + if created == len(expectedSetsCreated) { + // All created + break + } + + result, err := c.handleClusterUpdate(cluster) + require.NoError(t, err) + require.Equal(t, actionClusterUpdate, result) + + for _, action := range deps.kubeClient.Actions() { + created, ok := action.(kubetesting.CreateActionImpl) + if !ok { + continue + } + switch { + case created.Verb == "create" && + created.Resource == schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "statefulsets", + }: + + set, ok := created.Object.(*appsv1.StatefulSet) + require.True(t, ok) + + // Track we created it + expectedSetsCreated[set.Name] = true + + // Set owning references for newly created stateful set + set.SetOwnerReferences([]metav1.OwnerReference{ + *metav1.NewControllerRef(cluster, schema.GroupVersionKind{ + Group: myspec.SchemeGroupVersion.Group, + Version: myspec.SchemeGroupVersion.Version, + Kind: "m3dbcluster", + }), + }) + + // Also set the number of ready replicas to as expected + replicas := int32(1) + set.Spec.Replicas = &replicas + set.Status.ReadyReplicas = replicas + + _, err := deps.kubeClient.AppsV1().StatefulSets("namespace").Update(set) + require.NoError(t, err) + } + } + deps.kubeClient.ClearActions() + } + }) + } +} diff --git a/pkg/controller/update_cluster_test.go b/pkg/controller/update_cluster_test.go index 9661b9ff..4836882f 100644 --- a/pkg/controller/update_cluster_test.go +++ b/pkg/controller/update_cluster_test.go @@ -70,13 +70,11 @@ func (namespaceMatcher) String() string { func TestReconcileNamespaces(t *testing.T) { cluster := getFixture("cluster-simple.yaml", t) - deps := newTestDeps(t, &testOpts{ crdObjects: []runtime.Object{cluster}, }) nsMock := deps.namespaceClient - - controller := deps.newController() + controller := deps.newController(t) defer deps.cleanup() registry := &dbns.Registry{ @@ -104,8 +102,7 @@ func TestPruneNamespaces(t *testing.T) { crdObjects: []runtime.Object{cluster}, }) nsMock := deps.namespaceClient - - controller := deps.newController() + controller := deps.newController(t) defer deps.cleanup() registry := &dbns.Registry{Namespaces: map[string]*dbns.NamespaceOptions{ @@ -142,8 +139,7 @@ func TestCreateNamespaces(t *testing.T) { crdObjects: []runtime.Object{cluster}, }) nsMock := deps.namespaceClient - - controller := deps.newController() + controller := deps.newController(t) defer deps.cleanup() registry := &dbns.Registry{Namespaces: map[string]*dbns.NamespaceOptions{}} @@ -246,7 +242,7 @@ func TestSetPodBootstrappingStatus(t *testing.T) { deps := newTestDeps(t, &testOpts{ crdObjects: []runtime.Object{cluster}, }) - controller := deps.newController() + controller := deps.newController(t) defer deps.cleanup() cluster, err := controller.setStatusPodBootstrapping(cluster, corev1.ConditionTrue, "foo", "bar") @@ -263,7 +259,7 @@ func TestSetStatus(t *testing.T) { crdObjects: []runtime.Object{cluster}, clock: fakeClock, }) - controller := deps.newController() + controller := deps.newController(t) defer deps.cleanup() const cond = myspec.ClusterConditionPlacementInitialized @@ -299,7 +295,7 @@ func TestReconcileBootstrappingStatus(t *testing.T) { deps := newTestDeps(t, &testOpts{ crdObjects: []runtime.Object{cluster}, }) - controller := deps.newController() + controller := deps.newController(t) defer deps.cleanup() const cond = myspec.ClusterConditionPodBootstrapping @@ -314,8 +310,7 @@ func TestReconcileBootstrappingStatus(t *testing.T) { pl := newPl(shard.Initializing) - var err error - cluster, err = controller.reconcileBootstrappingStatus(cluster, pl) + cluster, err := controller.reconcileBootstrappingStatus(cluster, pl) assert.NoError(t, err) _, ok := cluster.Status.GetCondition(cond) assert.False(t, ok) @@ -335,7 +330,7 @@ func TestAddPodToPlacement(t *testing.T) { deps := newTestDeps(t, &testOpts{ crdObjects: []runtime.Object{cluster}, }) - controller := deps.newController() + controller := deps.newController(t) defer deps.cleanup() pod := &corev1.Pod{ @@ -471,7 +466,7 @@ func TestExpandPlacementForSet(t *testing.T) { }) placementMock := deps.placementClient idProvider := deps.idProvider - controller := deps.newController() + controller := deps.newController(t) defer deps.cleanup() identifyPods(idProvider, pods, nil) @@ -493,7 +488,7 @@ func TestExpandPlacementForSet(t *testing.T) { func TestExpandPlacementForSet_Nop(t *testing.T) { deps := newTestDeps(t, &testOpts{}) - controller := deps.newController() + controller := deps.newController(t) idProvider := deps.idProvider defer deps.cleanup() @@ -515,7 +510,7 @@ func TestExpandPlacementForSet_Nop(t *testing.T) { func TestExpandPlacementForSet_Err(t *testing.T) { deps := newTestDeps(t, &testOpts{}) idProvider := deps.idProvider - controller := deps.newController() + controller := deps.newController(t) defer deps.cleanup() cluster := getFixture("cluster-3-zones.yaml", t) @@ -543,7 +538,7 @@ func TestShrinkPlacementForSet(t *testing.T) { kubeObjects: objectsFromPods(pods...), }) placementMock := deps.placementClient - controller := deps.newController() + controller := deps.newController(t) defer deps.cleanup() identifyPods(deps.idProvider, pods, nil) @@ -579,7 +574,7 @@ func TestValidatePlacementWithStatus(t *testing.T) { placementMock := deps.placementClient defer deps.cleanup() - controller := deps.newController() + controller := deps.newController(t) placementMock.EXPECT().Get().AnyTimes() @@ -625,7 +620,7 @@ func TestValidatePlacementWithStatus_ErrNotFound(t *testing.T) { placementMock := deps.placementClient defer deps.cleanup() - controller := deps.newController() + controller := deps.newController(t) idProvider := deps.idProvider expInsts := []string{ @@ -736,7 +731,7 @@ func TestCheckPodsForReplacement(t *testing.T) { crdObjects: []runtime.Object{cluster}, }) - controller := deps.newController() + controller := deps.newController(t) idProvider := deps.idProvider defer deps.cleanup() @@ -782,7 +777,7 @@ func TestReplacePodInPlacement(t *testing.T) { crdObjects: []runtime.Object{cluster}, }) - controller := deps.newController() + controller := deps.newController(t) idProvider := deps.idProvider defer deps.cleanup() @@ -836,7 +831,7 @@ func TestReplacePodInPlacementWithError(t *testing.T) { crdObjects: []runtime.Object{cluster}, }) - controller := deps.newController() + controller := deps.newController(t) idProvider := deps.idProvider defer deps.cleanup() @@ -903,7 +898,7 @@ func TestFindPodInPlacement(t *testing.T) { crdObjects: []runtime.Object{cluster}, }) - controller := deps.newController() + controller := deps.newController(t) idProvider := deps.idProvider defer deps.cleanup() @@ -940,7 +935,7 @@ func TestFindPodToRemove(t *testing.T) { crdObjects: []runtime.Object{cluster}, }) - controller := deps.newController() + controller := deps.newController(t) idProvider := deps.idProvider defer deps.cleanup() @@ -986,7 +981,7 @@ func TestEtcdFinalizer(t *testing.T) { crdObjects: []runtime.Object{cluster}, }) - controller := deps.newController() + controller := deps.newController(t) defer deps.cleanup() // Mock the API to return errors so we know we don't hit in in @@ -1050,7 +1045,7 @@ func TestDeletePlacement(t *testing.T) { deps := newTestDeps(t, &testOpts{ crdObjects: []runtime.Object{cluster}, }) - controller := deps.newController() + controller := deps.newController(t) defer deps.cleanup() deps.placementClient.EXPECT().Get().Return(nil, errors.New("TEST")) @@ -1088,7 +1083,7 @@ func TestDeleteAllNamespaces(t *testing.T) { deps := newTestDeps(t, &testOpts{ crdObjects: []runtime.Object{cluster}, }) - controller := deps.newController() + controller := deps.newController(t) defer deps.cleanup() deps.namespaceClient.EXPECT().List().Return(nil, errors.New("TEST")) @@ -1113,7 +1108,7 @@ func TestDeleteAllNamespaces(t *testing.T) { deps := newTestDeps(t, &testOpts{ crdObjects: []runtime.Object{cluster}, }) - controller := deps.newController() + controller := deps.newController(t) defer deps.cleanup() deps.namespaceClient.EXPECT().List().Return(testResp, nil) From df0119aa2742c5fb34fc11f0f10cf262fcc02a5c Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Wed, 19 Jun 2019 22:16:10 -0400 Subject: [PATCH 2/4] fix racy test --- pkg/controller/controller_test.go | 128 +++++++++++++----------------- 1 file changed, 54 insertions(+), 74 deletions(-) diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index c1b3038e..77305c97 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -23,6 +23,7 @@ package controller import ( "fmt" "sort" + "sync" "testing" "time" @@ -43,6 +44,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" kubeinformers "k8s.io/client-go/informers" kubefake "k8s.io/client-go/kubernetes/fake" + ktesting "k8s.io/client-go/testing" kubetesting "k8s.io/client-go/testing" "github.com/golang/mock/gomock" @@ -429,8 +431,12 @@ func TestHandleUpdateClusterCreatesStatefulSets(t *testing.T) { expCreateStatefulSets []string }{ { - name: "creates missing stateful sets at tail", - cluster: newMeta("cluster1", map[string]string{"foo": "bar"}), + name: "creates missing stateful sets at tail", + cluster: newMeta("cluster1", map[string]string{ + "foo": "bar", + "operator.m3db.io/app": "m3db", + "operator.m3db.io/cluster": "cluster1", + }), sets: []*metav1.ObjectMeta{ newMeta("cluster1-rep0", nil), }, @@ -438,8 +444,12 @@ func TestHandleUpdateClusterCreatesStatefulSets(t *testing.T) { expCreateStatefulSets: []string{"cluster1-rep1", "cluster1-rep2"}, }, { - name: "creates missing stateful sets at head and tail", - cluster: newMeta("cluster1", map[string]string{"foo": "bar"}), + name: "creates missing stateful sets at head and tail", + cluster: newMeta("cluster1", map[string]string{ + "foo": "bar", + "operator.m3db.io/app": "m3db", + "operator.m3db.io/cluster": "cluster1", + }), sets: []*metav1.ObjectMeta{ newMeta("cluster1-rep1", nil), }, @@ -447,8 +457,12 @@ func TestHandleUpdateClusterCreatesStatefulSets(t *testing.T) { expCreateStatefulSets: []string{"cluster1-rep0", "cluster1-rep2"}, }, { - name: "creates missing stateful sets at head", - cluster: newMeta("cluster1", map[string]string{"foo": "bar"}), + name: "creates missing stateful sets at head", + cluster: newMeta("cluster1", map[string]string{ + "foo": "bar", + "operator.m3db.io/app": "m3db", + "operator.m3db.io/cluster": "cluster1", + }), sets: []*metav1.ObjectMeta{ newMeta("cluster1-rep2", nil), }, @@ -475,6 +489,7 @@ func TestHandleUpdateClusterCreatesStatefulSets(t *testing.T) { } cluster.Spec.IsolationGroups = append(cluster.Spec.IsolationGroups, group) } + cluster.ObjectMeta.Finalizers = []string{labels.EtcdDeletionFinalizer} objects := make([]runtime.Object, len(test.sets)) statefulSets := make([]*appsv1.StatefulSet, len(test.sets)) @@ -491,92 +506,57 @@ func TestHandleUpdateClusterCreatesStatefulSets(t *testing.T) { } statefulSets[i] = set objects[i] = set - } - - deps := newTestDeps(t, &testOpts{ - kubeObjects: objects, - }) - defer deps.cleanup() - - for _, set := range statefulSets { - // Now set the owner refs and ensure we pick up the sets - set.SetOwnerReferences([]metav1.OwnerReference{ + set.OwnerReferences = []metav1.OwnerReference{ *metav1.NewControllerRef(cluster, schema.GroupVersionKind{ Group: myspec.SchemeGroupVersion.Group, Version: myspec.SchemeGroupVersion.Version, Kind: "m3dbcluster", }), - }) - // Also set the number of ready replicas to as expected - replicas := int32(1) - set.Spec.Replicas = &replicas - set.Status.ReadyReplicas = replicas - _, err := deps.kubeClient.AppsV1().StatefulSets("namespace").Update(set) - assert.NoError(t, err) + } } + deps := newTestDeps(t, &testOpts{ + crdObjects: []runtime.Object{ + &myspec.M3DBCluster{ + ObjectMeta: *test.cluster, + }, + }, + kubeObjects: objects, + }) + defer deps.cleanup() c := deps.newController(t) // Keep running cluster updates until no more stateful sets required + var expectedMu sync.Mutex expectedSetsCreated := make(map[string]bool) for _, name := range test.expCreateStatefulSets { expectedSetsCreated[name] = false } - for { - created := 0 - for _, v := range expectedSetsCreated { - if v { - created++ - } - } - if created == len(expectedSetsCreated) { - // All created - break - } - result, err := c.handleClusterUpdate(cluster) + c.kubeClient.(*kubefake.Clientset).PrependReactor("create", "statefulsets", func(action ktesting.Action) (bool, runtime.Object, error) { + cluster := action.(kubetesting.CreateActionImpl).GetObject().(*appsv1.StatefulSet) + name := cluster.Name + expectedMu.Lock() + expectedSetsCreated[name] = true + expectedMu.Unlock() + return true, cluster, nil + }) + + var done bool + for i := 0; i < 5; i++ { + _, err := c.handleClusterUpdate(cluster) require.NoError(t, err) - require.Equal(t, actionClusterUpdate, result) - - for _, action := range deps.kubeClient.Actions() { - created, ok := action.(kubetesting.CreateActionImpl) - if !ok { - continue - } - switch { - case created.Verb == "create" && - created.Resource == schema.GroupVersionResource{ - Group: "apps", - Version: "v1", - Resource: "statefulsets", - }: - - set, ok := created.Object.(*appsv1.StatefulSet) - require.True(t, ok) - - // Track we created it - expectedSetsCreated[set.Name] = true - - // Set owning references for newly created stateful set - set.SetOwnerReferences([]metav1.OwnerReference{ - *metav1.NewControllerRef(cluster, schema.GroupVersionKind{ - Group: myspec.SchemeGroupVersion.Group, - Version: myspec.SchemeGroupVersion.Version, - Kind: "m3dbcluster", - }), - }) - - // Also set the number of ready replicas to as expected - replicas := int32(1) - set.Spec.Replicas = &replicas - set.Status.ReadyReplicas = replicas - - _, err := deps.kubeClient.AppsV1().StatefulSets("namespace").Update(set) - require.NoError(t, err) - } + + expectedMu.Lock() + created := len(expectedSetsCreated) + expectedMu.Unlock() + if created != len(test.expCreateStatefulSets) { + time.Sleep(100 * time.Millisecond) + continue } - deps.kubeClient.ClearActions() + done = true } + assert.True(t, done, "expected all sets to be created") }) } } From b9a4f6fac0ac7ab7395f0770c302edf239862bb1 Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Wed, 19 Jun 2019 23:38:19 -0400 Subject: [PATCH 3/4] replace cluster action w/ k8s reactor --- pkg/controller/controller.go | 89 ++++++++++++------------------- pkg/controller/controller_test.go | 2 +- 2 files changed, 36 insertions(+), 55 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 9e44bf43..11b1d32f 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -71,14 +71,6 @@ var ( errNonUniqueIsoGroups = errors.New("isolation group names are not unique") ) -type handleClusterUpdateResult uint - -const ( - noopClusterUpdate handleClusterUpdateResult = iota - waitClusterUpdate - actionClusterUpdate -) - // Configuration contains parameters for the controller. type Configuration struct { // ManageCRD indicates whether the controller should create and update specs @@ -334,15 +326,12 @@ func (c *Controller) handleClusterEvent(key string) error { return errors.New("got nil cluster for " + key) } - _, err = c.handleClusterUpdate(cluster) - return err + return c.handleClusterUpdate(cluster) } // We are guaranteed by handleClusterEvent that we will never be passed a nil // cluster here. -func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) (handleClusterUpdateResult, error) { - var result handleClusterUpdateResult - +func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error { // MUST create a deep copy of the cluster or risk corrupting cache! Technically // only need if we modify, but we frequently do that so let's deep copy to // start and remove unnecessary calls later to optimize if we want. @@ -358,7 +347,7 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) (handleClu if dts := cluster.ObjectMeta.DeletionTimestamp; dts != nil && !dts.IsZero() { if !stringArrayContains(cluster.Finalizers, labels.EtcdDeletionFinalizer) { clusterLogger.Info("no etcd finalizer on cluster, nothing to do") - return result, nil + return nil } // If cluster is set to preserve data, jump straight to removing the @@ -368,54 +357,54 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) (handleClu } else { if err := c.deleteAllNamespaces(cluster); err != nil { clusterLogger.Error("error deleting cluster namespaces", zap.Error(err)) - return result, err + return err } if err := c.deletePlacement(cluster); err != nil { clusterLogger.Error("error deleting cluster placement", zap.Error(err)) - return result, err + return err } } if _, err := c.removeEtcdFinalizer(cluster); err != nil { clusterLogger.Error("error deleting etcd finalizer", zap.Error(err)) - return result, pkgerrors.WithMessage(err, "error removing etcd cluster finalizer") + return pkgerrors.WithMessage(err, "error removing etcd cluster finalizer") } // Exit the control loop once the cluster is deleted and cleaned up. clusterLogger.Info("completed finalizer cleanup") - return result, nil + return nil } if err := validateIsolationGroups(cluster); err != nil { clusterLogger.Error("failed validating isolationgroups", zap.Error(err)) c.recorder.WarningEvent(cluster, eventer.ReasonFailSync, err.Error()) - return result, err + return err } if !cluster.Spec.KeepEtcdDataOnDelete { var err error cluster, err = c.ensureEtcdFinalizer(cluster) if err != nil { - return result, err + return err } } if err := c.ensureConfigMap(cluster); err != nil { clusterLogger.Error("failed to ensure configmap", zap.Error(err)) c.recorder.WarningEvent(cluster, eventer.ReasonFailSync, "failed to ensure configmap: %s", err.Error()) - return result, err + return err } // Per https://v1-10.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.10/#statefulsetspec-v1-apps, // headless service MUST exist before statefulset. if err := c.ensureServices(cluster); err != nil { - return result, err + return err } if len(cluster.Spec.IsolationGroups) == 0 { // nothing to do, no groups to create in - return result, nil + return nil } // copy since we sort the array @@ -425,7 +414,7 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) (handleClu childrenSets, err := c.getChildStatefulSets(cluster) if err != nil { - return result, err + return err } childrenSetsByName := make(map[string]*appsv1.StatefulSet) @@ -436,8 +425,7 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) (handleClu if sts.Spec.Replicas != nil && *sts.Spec.Replicas != sts.Status.ReadyReplicas { // TODO(schallert): figure out what to do if replicas is not set c.logger.Info("waiting for statefulset to be ready", zap.String("name", sts.Name), zap.Int32("ready", sts.Status.ReadyReplicas)) - result = waitClusterUpdate - return result, nil + return nil } } @@ -448,24 +436,24 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) (handleClu if !exists { sts, err := k8sops.GenerateStatefulSet(cluster, isoGroups[i].Name, isoGroups[i].NumInstances) if err != nil { - return result, err + return err } _, err = c.kubeClient.AppsV1().StatefulSets(cluster.Namespace).Create(sts) if err != nil { c.logger.Error(err.Error()) - return result, err + return err } c.logger.Info("created statefulset", zap.String("name", name)) - return actionClusterUpdate, nil + return nil } } if err := c.reconcileNamespaces(cluster); err != nil { c.recorder.WarningEvent(cluster, eventer.ReasonFailedCreate, "failed to create namespace: %s", err) c.logger.Error("error reconciling namespaces", zap.Error(err)) - return result, err + return err } if len(cluster.Spec.Namespaces) == 0 { @@ -476,7 +464,7 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) (handleClu if !cluster.Status.HasInitializedPlacement() { cluster, err = c.validatePlacementWithStatus(cluster) if err != nil { - return result, err + return err } } @@ -486,12 +474,12 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) (handleClu selector := klabels.SelectorFromSet(labels.BaseLabels(cluster)) pods, err := c.podLister.Pods(cluster.Namespace).List(selector) if err != nil { - return result, fmt.Errorf("error listing pods: %v", err) + return fmt.Errorf("error listing pods: %v", err) } placement, err := c.adminClient.placementClientForCluster(cluster).Get() if err != nil { - return result, fmt.Errorf("error fetching active placement: %v", err) + return fmt.Errorf("error fetching active placement: %v", err) } c.logger.Info("found placement", zap.Int("currentPods", len(pods)), zap.Int("placementInsts", placement.NumInstances())) @@ -506,28 +494,27 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) (handleClu if ln := len(unavailInsts); ln > 0 { c.logger.Warn("waiting for instances to be available", zap.Strings("instances", unavailInsts)) c.recorder.WarningEvent(cluster, eventer.ReasonLongerThanUsual, "current unavailable instances: %d", ln) - result = waitClusterUpdate - return result, nil + return nil } // Determine if any sets aren't at their desired replica count. Maybe we can // reuse the set objects from above but being paranoid for now. childrenSets, err = c.getChildStatefulSets(cluster) if err != nil { - return result, nil + return nil } // check if any pods inside the cluster need to be swapped in leavingInstanceID, podToReplace, err := c.checkPodsForReplacement(cluster, pods, placement) if err != nil { - return result, nil + return nil } if podToReplace != nil { err = c.replacePodInPlacement(cluster, placement, leavingInstanceID, podToReplace) if err != nil { c.recorder.WarningEvent(cluster, eventer.ReasonFailedToUpdate, "could not replace instance: "+leavingInstanceID) - return result, err + return err } c.recorder.NormalEvent(cluster, eventer.ReasonSuccessfulUpdate, "successfully replaced instance: "+leavingInstanceID) } @@ -535,16 +522,16 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) (handleClu for _, set := range childrenSets { zone, ok := set.Labels[labels.IsolationGroup] if !ok { - return result, fmt.Errorf("statefulset %s has no isolation-group label", set.Name) + return fmt.Errorf("statefulset %s has no isolation-group label", set.Name) } group, ok := myspec.IsolationGroups(isoGroups).GetByName(zone) if !ok { - return result, fmt.Errorf("zone %s not found in cluster isoGroups %v", zone, isoGroups) + return fmt.Errorf("zone %s not found in cluster isoGroups %v", zone, isoGroups) } if set.Spec.Replicas == nil { - return result, fmt.Errorf("set %s has unset spec replica", set.Name) + return fmt.Errorf("set %s has unset spec replica", set.Name) } // Number of pods we want in the group. @@ -572,10 +559,7 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) (handleClu // absent from the placement, add pods to placement. if inPlacement < current { setLogger.Info("expanding placement for set") - if err := c.expandPlacementForSet(cluster, set, group, placement); err != nil { - return result, err - } - return actionClusterUpdate, nil + return c.expandPlacementForSet(cluster, set, group, placement) } } @@ -583,10 +567,7 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) (handleClu // trigger a remove so that we can shrink the set. if inPlacement > desired { setLogger.Info("remove instance from placement for set") - if err := c.shrinkPlacementForSet(cluster, set, placement); err != nil { - return result, err - } - return actionClusterUpdate, nil + return c.shrinkPlacementForSet(cluster, set, placement) } var newCount int32 @@ -600,15 +581,15 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) (handleClu set.Spec.Replicas = pointer.Int32Ptr(newCount) set, err = c.kubeClient.AppsV1().StatefulSets(set.Namespace).Update(set) if err != nil { - return result, fmt.Errorf("error updating statefulset %s: %v", set.Name, err) + return fmt.Errorf("error updating statefulset %s: %v", set.Name, err) } - return actionClusterUpdate, nil + return nil } placement, err = c.adminClient.placementClientForCluster(cluster).Get() if err != nil { - return result, fmt.Errorf("error fetching placement: %v", err) + return fmt.Errorf("error fetching placement: %v", err) } // TODO(celina): possibly do a replacement check here @@ -616,7 +597,7 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) (handleClu // See if we need to clean up the pod bootstrapping status. cluster, err = c.reconcileBootstrappingStatus(cluster, placement) if err != nil { - return result, fmt.Errorf("error reconciling bootstrap status: %v", err) + return fmt.Errorf("error reconciling bootstrap status: %v", err) } c.logger.Info("nothing to do", @@ -625,7 +606,7 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) (handleClu zap.Int64("generation", cluster.ObjectMeta.Generation), zap.String("rv", cluster.ObjectMeta.ResourceVersion)) - return noopClusterUpdate, nil + return nil } func instancesInIsoGroup(pl m3placement.Placement, isoGroup string) []m3placement.Instance { diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 77305c97..9b5617ea 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -544,7 +544,7 @@ func TestHandleUpdateClusterCreatesStatefulSets(t *testing.T) { var done bool for i := 0; i < 5; i++ { - _, err := c.handleClusterUpdate(cluster) + err := c.handleClusterUpdate(cluster) require.NoError(t, err) expectedMu.Lock() From 58e16fc4565c315ce7107762c48b824ed427c49b Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Wed, 19 Jun 2019 23:41:54 -0400 Subject: [PATCH 4/4] fix bad err --- pkg/controller/controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 11b1d32f..3306fab9 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -501,13 +501,13 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error { // reuse the set objects from above but being paranoid for now. childrenSets, err = c.getChildStatefulSets(cluster) if err != nil { - return nil + return err } // check if any pods inside the cluster need to be swapped in leavingInstanceID, podToReplace, err := c.checkPodsForReplacement(cluster, pods, placement) if err != nil { - return nil + return err } if podToReplace != nil {