diff --git a/pkg/apis/m3dboperator/v1alpha1/cluster.go b/pkg/apis/m3dboperator/v1alpha1/cluster.go index 6f129096..a4ecbf0d 100644 --- a/pkg/apis/m3dboperator/v1alpha1/cluster.go +++ b/pkg/apis/m3dboperator/v1alpha1/cluster.go @@ -319,6 +319,10 @@ type ClusterSpec struct { // SidecarVolumes is used to add any volumes that are required by sidecar // containers. SidecarVolumes []corev1.Volume `json:"sidecarVolumes,omitempty"` + + // OnDeleteUpdateStrategy sets StatefulSets created by the operator to + // have OnDelete as the update strategy instead of RollingUpdate. + OnDeleteUpdateStrategy bool `json:"onDeleteUpdateStrategy,omitempty"` } // ExternalCoordinatorConfig defines parameters for using an external diff --git a/pkg/apis/m3dboperator/v1alpha1/openapi_generated.go b/pkg/apis/m3dboperator/v1alpha1/openapi_generated.go index 1fc3420d..5755f181 100644 --- a/pkg/apis/m3dboperator/v1alpha1/openapi_generated.go +++ b/pkg/apis/m3dboperator/v1alpha1/openapi_generated.go @@ -650,6 +650,13 @@ func schema_pkg_apis_m3dboperator_v1alpha1_ClusterSpec(ref common.ReferenceCallb }, }, }, + "onDeleteUpdateStrategy": { + SchemaProps: spec.SchemaProps{ + Description: "OnDeleteUpdateStrategy sets StatefulSets created by the operator to have OnDelete as the update strategy instead of RollingUpdate.", + Type: []string{"boolean"}, + Format: "", + }, + }, }, Required: []string{"parallelPodManagement"}, }, diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 11809831..600bed84 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -26,6 +26,7 @@ import ( "fmt" "reflect" "sort" + "strconv" "sync" "github.com/m3db/m3db-operator/pkg/apis/m3dboperator" @@ -48,6 +49,7 @@ import ( kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" klabels "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/runtime" @@ -57,12 +59,12 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/pointer" jsonpatch "github.com/evanphx/json-patch" pkgerrors "github.com/pkg/errors" "github.com/uber-go/tally" "go.uber.org/zap" - "k8s.io/utils/pointer" ) const ( @@ -315,7 +317,6 @@ func (c *M3DBController) processClusterQueueItem() bool { return nil }(obj) - if err != nil { runtime.HandleError(err) } @@ -521,7 +522,8 @@ func (c *M3DBController) handleClusterUpdate(cluster *myspec.M3DBCluster) error replicas := *sts.Spec.Replicas ready := replicas == sts.Status.ReadyReplicas - if sts.Status.UpdateRevision != "" { + if sts.Status.UpdateRevision != "" && + sts.Spec.UpdateStrategy.Type == appsv1.RollingUpdateStatefulSetStrategyType { // If there is an update revision, ensure all pods are updated // otherwise there is a rollout in progress. // Note: This ensures there is no race condition between @@ -536,6 +538,7 @@ func (c *M3DBController) handleClusterUpdate(cluster *myspec.M3DBCluster) error if !ready { c.logger.Info("waiting for statefulset to be ready", + zap.String("namespace", sts.Namespace), zap.String("name", sts.Name), zap.Int32("replicas", replicas), zap.Int32("readyReplicas", sts.Status.ReadyReplicas), @@ -560,31 +563,48 @@ func (c *M3DBController) handleClusterUpdate(cluster *myspec.M3DBCluster) error return err } - if !update { + // If we're not updating the statefulset AND we're not using the OnDelete update + // strategy, then move to the next statefulset. When using the OnDelete update + // strategy, we still may want to restart nodes for this particular statefulset, + // so don't continue yet. + onDeleteUpdateStrategy := + actual.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType + if !update && !onDeleteUpdateStrategy { continue } - _, err = c.kubeClient.AppsV1().StatefulSets(cluster.Namespace).Update(expected) - if err != nil { - c.logger.Error(err.Error()) - return err + if update { + _, err = c.applyStatefulSetUpdate(cluster, actual, expected) + if err != nil { + c.logger.Error(err.Error()) + return err + } + return nil } - c.logger.Info("updated statefulset", - zap.String("name", expected.Name), - zap.Int32("actual_readyReplicas", actual.Status.ReadyReplicas), - zap.Int32("actual_updatedReplicas", actual.Status.UpdatedReplicas), - zap.String("actual_currentRevision", actual.Status.CurrentRevision), - zap.String("actual_updateRevision", actual.Status.UpdateRevision), - zap.Int32("expected_readyReplicas", expected.Status.ReadyReplicas), - zap.Int32("expected_updatedReplicas", expected.Status.UpdatedReplicas), - zap.String("expected_currentRevision", expected.Status.CurrentRevision), - zap.String("expected_updateRevision", expected.Status.UpdateRevision), - zap.Int64("generation", expected.Generation), - zap.Int64("observed", expected.Status.ObservedGeneration), - ) + // Using an OnDelete strategy, we have to update nodes if: + // - a statefulset update just happened + // - we're already in the middle of a rollout + // * because nodes are rolled out in chunks this can happen in many iterations + // Therefore, always check to see if pods need to be updated and return from this loop + // if the statefulset or pods were updated. If a rollout is finished or there has not + // been a change, this call is a no-op. + if onDeleteUpdateStrategy { + nodesUpdated, err := c.updateStatefulSetPods(cluster, actual) + if err != nil { + c.logger.Error("error performing update", + zap.Error(err), + zap.String("namespace", cluster.Namespace), + zap.String("name", actual.Name)) + return err + } - return nil + // If we've performed any updates at all, do not process the next statefulset. + // Wait for the updated pods to become healthy. + if nodesUpdated { + return nil + } + } } if !cluster.Status.HasInitializedPlacement() { @@ -715,31 +735,13 @@ func (c *M3DBController) handleClusterUpdate(cluster *myspec.M3DBCluster) error } setLogger.Info("resizing set, desired != current", zap.Int32("newSize", newCount)) - setBytes, err := json.Marshal(set) - if err != nil { + if err = c.patchStatefulSet(set, func(set *appsv1.StatefulSet) { + set.Spec.Replicas = pointer.Int32Ptr(newCount) + }); err != nil { + c.logger.Error("error patching statefulset", zap.Error(err)) return err } - set.Spec.Replicas = pointer.Int32Ptr(newCount) - - setModifiedBytes, err := json.Marshal(set) - if err != nil { - return err - } - - patchBytes, err := jsonpatch.CreateMergePatch(setBytes, setModifiedBytes) - if err != nil { - return err - } - - set, err = c.kubeClient. - AppsV1(). - StatefulSets(set.Namespace). - Patch(set.Name, types.MergePatchType, patchBytes) - if err != nil { - return fmt.Errorf("error updating statefulset %s: %v", set.Name, err) - } - return nil } @@ -765,6 +767,216 @@ func (c *M3DBController) handleClusterUpdate(cluster *myspec.M3DBCluster) error return nil } +func (c *M3DBController) patchStatefulSet( + set *appsv1.StatefulSet, + action func(set *appsv1.StatefulSet), +) error { + setBytes, err := json.Marshal(set) + if err != nil { + return err + } + + action(set) + + setModifiedBytes, err := json.Marshal(set) + if err != nil { + return err + } + + patchBytes, err := jsonpatch.CreateMergePatch(setBytes, setModifiedBytes) + if err != nil { + return err + } + + set, err = c.kubeClient. + AppsV1(). + StatefulSets(set.Namespace). + Patch(set.Name, types.MergePatchType, patchBytes) + if err != nil { + return fmt.Errorf("error updating statefulset %s: %w", set.Name, err) + } + + return nil +} + +func (c *M3DBController) applyStatefulSetUpdate( + cluster *myspec.M3DBCluster, + actual *appsv1.StatefulSet, + expected *appsv1.StatefulSet, +) (*appsv1.StatefulSet, error) { + updated, err := c.kubeClient.AppsV1().StatefulSets(cluster.Namespace).Update(expected) + if err != nil { + c.logger.Error(err.Error()) + return nil, err + } + + c.logger.Info("updated statefulset", + zap.String("name", expected.Name), + zap.Int32("actual_readyReplicas", actual.Status.ReadyReplicas), + zap.Int32("actual_updatedReplicas", actual.Status.UpdatedReplicas), + zap.String("actual_currentRevision", actual.Status.CurrentRevision), + zap.String("actual_updateRevision", actual.Status.UpdateRevision), + zap.Int32("expected_readyReplicas", expected.Status.ReadyReplicas), + zap.Int32("expected_updatedReplicas", expected.Status.UpdatedReplicas), + zap.String("expected_currentRevision", expected.Status.CurrentRevision), + zap.String("expected_updateRevision", expected.Status.UpdateRevision), + zap.Int64("generation", expected.Generation), + zap.Int64("observed", expected.Status.ObservedGeneration), + ) + return updated, nil +} + +// updateStatefulSetPods returns true if it updates any pods +func (c *M3DBController) updateStatefulSetPods( + cluster *myspec.M3DBCluster, + sts *appsv1.StatefulSet, +) (bool, error) { + logger := c.logger.With( + zap.String("namespace", cluster.Namespace), zap.String("name", sts.Name), + ) + + if _, ok := sts.Annotations[annotations.ParallelUpdateInProgress]; !ok { + logger.Debug("no update and no rollout in progress so move to next statefulset") + return false, nil + } + + numPods, err := getMaxPodsToUpdate(sts) + if err != nil { + return false, err + } + + if numPods == 0 { + return false, errors.New("parallel update annotation set to 0. will not perform pod updates") + } + + pods, err := c.podsToUpdate(cluster.Namespace, sts, numPods) + if err != nil { + return false, err + } + + if len(pods) > 0 { + names := make([]string, 0, len(pods)) + for _, pod := range pods { + if err := c.kubeClient.CoreV1(). + Pods(pod.Namespace). + Delete(pod.Name, &metav1.DeleteOptions{}); err != nil { + return false, err + } + names = append(names, pod.Name) + } + logger.Info("restarting pods", zap.Strings("pods", names)) + return true, nil + } + + // If there are no pods to update, we're fully rolled out so remove + // the update annotation and update status. + // + // NB(nate): K8s handles this for you when using the RollingUpdate update strategy. + // However, since OnDelete removes k8s from the pod update process, it's our + // responsibility to set this once done rolling out. + sts.Status.CurrentReplicas = sts.Status.UpdatedReplicas + sts.Status.CurrentRevision = sts.Status.UpdateRevision + + if sts, err = c.kubeClient.AppsV1().StatefulSets(cluster.Namespace).UpdateStatus(sts); err != nil { + return false, err + } + + if err = c.patchStatefulSet(sts, func(set *appsv1.StatefulSet) { + delete(set.Annotations, annotations.ParallelUpdateInProgress) + }); err != nil { + return false, err + } + logger.Info("update complete") + + return false, nil +} + +func getMaxPodsToUpdate(actual *appsv1.StatefulSet) (int, error) { + var ( + rawVal string + ok bool + ) + if rawVal, ok = actual.Annotations[annotations.ParallelUpdateInProgress]; !ok { + return 0, errors.New("parallel update annotation missing during statefulset update") + } + + var ( + maxPodsPerUpdate int + err error + ) + if maxPodsPerUpdate, err = strconv.Atoi(rawVal); err != nil { + return 0, fmt.Errorf("failed to parse parallel update annotation: %v", rawVal) + } + + return maxPodsPerUpdate, nil +} + +func (c *M3DBController) podsToUpdate( + namespace string, + sts *appsv1.StatefulSet, + numPods int, +) ([]*corev1.Pod, error) { + var ( + currRev = sts.Status.CurrentRevision + updateRev = sts.Status.UpdateRevision + ) + // nolint:gocritic + if currRev == "" { + return nil, errors.New("currentRevision empty") + } else if updateRev == "" { + return nil, errors.New("updateRevision empty") + } else if currRev == updateRev { + // No pods to update because current and update revision are the same + return nil, nil + } + + // Get any pods not on the updateRevision for this statefulset + podSelector, err := generatePodSelector(updateRev, sts) + if err != nil { + return nil, err + } + pods, err := c.podLister.Pods(namespace).List(podSelector) + if err != nil { + return nil, err + } else if len(pods) == 0 { + return nil, nil + } + + // NB(nate): Sort here so updates are always done in a consistent order. + // Statefulset 0 -> N: Pod 0 -> N + sortedPods, err := sortPods(pods) + if err != nil { + return nil, err + } + + toUpdate := make([]*corev1.Pod, 0, len(sortedPods)) + for _, pod := range sortedPods { + toUpdate = append(toUpdate, pod.pod) + if len(toUpdate) == numPods { + break + } + } + + return toUpdate, nil +} + +func generatePodSelector(updateRev string, sts *appsv1.StatefulSet) (klabels.Selector, error) { + revReq, err := klabels.NewRequirement( + "controller-revision-hash", selection.NotEquals, []string{updateRev}, + ) + if err != nil { + return nil, err + } + stsReq, err := klabels.NewRequirement( + labels.StatefulSet, selection.Equals, []string{sts.Name}, + ) + if err != nil { + return nil, err + } + podSelector := klabels.NewSelector().Add(*revReq, *stsReq) + return podSelector, nil +} + func instancesInIsoGroup(pl m3placement.Placement, isoGroup string) []m3placement.Instance { insts := []m3placement.Instance{} for _, inst := range pl.Instances() { @@ -876,7 +1088,6 @@ func (c *M3DBController) processPodQueueItem() bool { return nil }(obj) - if err != nil { runtime.HandleError(err) } @@ -1039,7 +1250,16 @@ func updatedStatefulSet( // The operator will only perform an update if the current StatefulSet has been // annotated to indicate that it is okay to update it. if val, ok := actual.Annotations[annotations.Update]; !ok || val != annotations.EnabledVal { - return nil, false, nil + str, ok := actual.Annotations[annotations.ParallelUpdate] + if !ok { + return nil, false, nil + } + + if parallelVal, err := strconv.Atoi(str); err != nil { + return nil, false, err + } else if parallelVal < 1 { + return nil, false, fmt.Errorf("parallel update value invalid: %v", str) + } } expected, err := m3db.GenerateStatefulSet(cluster, isoGroup.Name, isoGroup.NumInstances) @@ -1067,11 +1287,12 @@ func updatedStatefulSet( } // If we don't need to perform an update to the StatefulSet's spec, but the StatefulSet - // has the update annotation, we'll still update the StatefulSet to remove the update - // annotation. This ensures that users can always set the update annotation and then + // has an update annotation, we'll still update the StatefulSet to remove the update + // annotation. This ensures that users can always set an update annotation and then // wait for it to be removed to know if the operator has processed a StatefulSet. if !update { delete(actual.Annotations, annotations.Update) + delete(actual.Annotations, annotations.ParallelUpdate) return actual, true, nil } @@ -1098,6 +1319,15 @@ func copyAnnotations(expected, actual *appsv1.StatefulSet) { continue } + // For parallel updates, remove the initial annotation added by the client and add rollout + // in progress annotation. This will ensure that in future passes we don't attempt to + // update the statefulset again unnecessarily and simply roll pods that need to pick up + // updates. + if k == annotations.ParallelUpdate { + expected.Annotations[annotations.ParallelUpdateInProgress] = v + continue + } + if _, ok := expected.Annotations[k]; !ok { expected.Annotations[k] = v } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 2e4ffb35..a51529ea 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -79,16 +79,19 @@ func setupTestCluster( t *testing.T, clusterMeta metav1.ObjectMeta, sets []*metav1.ObjectMeta, + pods []*corev1.Pod, replicationFactor int, + numInstances int32, + onDeleteUpdateStrategy bool, ) (*myspec.M3DBCluster, *testDeps) { - const numInstances = 1 cfgMapName := defaultConfigMapName cluster := &myspec.M3DBCluster{ ObjectMeta: clusterMeta, Spec: myspec.ClusterSpec{ - Image: defaultTestImage, - ReplicationFactor: int32(replicationFactor), - ConfigMapName: &cfgMapName, + Image: defaultTestImage, + ReplicationFactor: int32(replicationFactor), + ConfigMapName: &cfgMapName, + OnDeleteUpdateStrategy: onDeleteUpdateStrategy, }, } cluster.ObjectMeta.UID = "abcd" @@ -129,6 +132,11 @@ func setupTestCluster( } } + for _, pod := range pods { + // nolint:makezero + objects = append(objects, runtime.Object(pod)) + } + deps := newTestDeps(t, &testOpts{ crdObjects: []runtime.Object{ &myspec.M3DBCluster{ @@ -147,11 +155,13 @@ type waitForStatefulSetsOptions struct { simulatePodsNotUpdated bool simulateStaleGeneration bool expectError string + currentRevision string } type waitForStatefulSetsResult struct { updatedStatefulSets []string failedUpdateStatefulSets []string + podUpdateGroups [][]string } func waitForStatefulSets( @@ -162,9 +172,10 @@ func waitForStatefulSets( opts waitForStatefulSetsOptions, ) (waitForStatefulSetsResult, bool) { var ( - mu sync.Mutex - updates int - actualSets = make(map[string]bool) + mu sync.Mutex + updates int + actualSets = make(map[string]bool) + updatedPods []string ) controller.kubeClient.(*kubefake.Clientset).PrependReactor(verb, "statefulsets", func(action ktesting.Action) (bool, runtime.Object, error) { @@ -178,6 +189,7 @@ func waitForStatefulSets( // replicas check is enforced. updates++ sts.Status.UpdateRevision = fmt.Sprintf("updated-revision-%d", updates) + sts.Status.CurrentRevision = fmt.Sprintf("%s-%s", sts.Name, opts.currentRevision) if !opts.simulatePodsNotUpdated { // Simulate all update immediately, unless explicitly // testing for when not updating them. @@ -206,12 +218,23 @@ func waitForStatefulSets( return false, nil, nil }) + controller.kubeClient.(*kubefake.Clientset).PrependReactor( + "delete", "pods", func(action ktesting.Action) (bool, runtime.Object, error) { + podName := action.(kubetesting.DeleteActionImpl).GetName() + mu.Lock() + updatedPods = append(updatedPods, podName) + mu.Unlock() + + return false, nil, nil + }) + // Iterate through the expected stateful sets twice (or at least 5 times) to make sure // we see all stateful sets that we expect and also be able to catch any extra stateful // sets that we don't. var ( - iters = math.Max(float64(2*len(opts.expectedStatefulSets)), 5) - finalErr error + iters = math.Max(float64(2*len(opts.expectedStatefulSets)), 10) + finalErr error + podUpdateGroups [][]string ) for i := 0; i < int(iters); i++ { err := controller.handleClusterUpdate(cluster) @@ -228,14 +251,15 @@ func waitForStatefulSets( seen++ } } + if len(updatedPods) > 0 { + podUpdateGroups = append(podUpdateGroups, updatedPods) + updatedPods = nil + } mu.Unlock() if seen != len(opts.expectedStatefulSets) { time.Sleep(100 * time.Millisecond) - continue } - - break } mu.Lock() @@ -276,6 +300,7 @@ func waitForStatefulSets( return waitForStatefulSetsResult{ updatedStatefulSets: updated, failedUpdateStatefulSets: failed, + podUpdateGroups: podUpdateGroups, }, done } @@ -690,7 +715,9 @@ func TestHandleUpdateClusterCreatesStatefulSets(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - cluster, deps := setupTestCluster(t, *test.cluster, test.sets, test.replicationFactor) + cluster, deps := setupTestCluster( + t, *test.cluster, test.sets, nil, test.replicationFactor, 1, false, + ) defer deps.cleanup() c := deps.newController(t) @@ -866,31 +893,11 @@ func TestHandleUpdateClusterUpdatesStatefulSets(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { const replicas = 3 - cluster, deps := setupTestCluster(t, *test.cluster, test.sets, replicas) + cluster, deps := setupTestCluster(t, *test.cluster, test.sets, nil, replicas, 1, false) defer deps.cleanup() c := deps.newController(t) - deps.namespaceClient.EXPECT().List().AnyTimes().Return(&admin.NamespaceGetResponse{ - Registry: &namespacepb.Registry{}, - }, nil) - - var mockInstances []placement.Instance - for i := 0; i < replicas; i++ { - inst := placement.NewMockInstance(deps.mockController) - - // Since the controller checks all instances are available in the placement after - // checking and performing any updates, and we're only concerned with the latter - // in this test, configure the mock instances to be unavailable to reduce the - // amount of mocks we need to set up. - inst.EXPECT().IsAvailable().AnyTimes().Return(false) - inst.EXPECT().ID().AnyTimes().Return(strconv.Itoa(i)) - mockInstances = append(mockInstances, inst) - } - - mockPlacement := placement.NewMockPlacement(deps.mockController) - mockPlacement.EXPECT().NumInstances().AnyTimes().Return(len(mockInstances)) - mockPlacement.EXPECT().Instances().AnyTimes().Return(mockInstances) - deps.placementClient.EXPECT().Get().AnyTimes().Return(mockPlacement, nil) + mockPlacement(deps, replicas) if test.newImage != "" { cluster.Spec.Image = test.newImage @@ -925,8 +932,118 @@ func TestHandleUpdateClusterUpdatesStatefulSets(t *testing.T) { } } -func TestHandleUpdateClusterFrozen(t *testing.T) { +func TestHandleUpdateClusterOnDeleteStrategy(t *testing.T) { + var ( + replicas int32 = 3 + currentRevision = "current-revision" + clusterName = "cluster1" + newImage = "m3db:v2.0.0" + baseLabels = map[string]string{ + "operator.m3db.io/app": "m3db", + "operator.m3db.io/cluster": "cluster", + } + rawCluster = newMeta(clusterName, baseLabels, nil) + ) + tests := []struct { + name string + sets []*metav1.ObjectMeta + pods []*corev1.Pod + expUpdateStatefulSets []string + expPodUpdateGroups [][]string + updateVal int + }{ + { + name: "update all nodes at once", + sets: generateSets(clusterName, replicas, "3", true), + pods: generatePods(clusterName, replicas, 3, currentRevision), + expUpdateStatefulSets: []string{"cluster1-rep0", "cluster1-rep1", "cluster1-rep2"}, + expPodUpdateGroups: [][]string{ + {"cluster1-rep0-0", "cluster1-rep0-1", "cluster1-rep0-2"}, + {"cluster1-rep1-0", "cluster1-rep1-1", "cluster1-rep1-2"}, + {"cluster1-rep2-0", "cluster1-rep2-1", "cluster1-rep2-2"}, + }, + updateVal: 3, + }, + { + name: "update some nodes, not evenly divisible", + sets: generateSets(clusterName, replicas, "2", true), + pods: generatePods(clusterName, replicas, 3, currentRevision), + expUpdateStatefulSets: []string{"cluster1-rep0", "cluster1-rep1", "cluster1-rep2"}, + expPodUpdateGroups: [][]string{ + {"cluster1-rep0-0", "cluster1-rep0-1"}, + {"cluster1-rep0-2"}, + {"cluster1-rep1-0", "cluster1-rep1-1"}, + {"cluster1-rep1-2"}, + {"cluster1-rep2-0", "cluster1-rep2-1"}, + {"cluster1-rep2-2"}, + }, + updateVal: 2, + }, + { + name: "update one at a time", + sets: generateSets(clusterName, replicas, "1", true), + pods: generatePods(clusterName, replicas, 2, currentRevision), + expUpdateStatefulSets: []string{"cluster1-rep0", "cluster1-rep1", "cluster1-rep2"}, + expPodUpdateGroups: [][]string{ + {"cluster1-rep0-0"}, + {"cluster1-rep0-1"}, + {"cluster1-rep1-0"}, + {"cluster1-rep1-1"}, + {"cluster1-rep2-0"}, + {"cluster1-rep2-1"}, + }, + updateVal: 1, + }, + { + name: "only update statefulset with annotation", + sets: []*metav1.ObjectMeta{ + newMeta("cluster1-rep0", nil, map[string]string{ + annotations.ParallelUpdate: "1", + }), + newMeta("cluster1-rep1", nil, nil), + newMeta("cluster1-rep2", nil, nil), + }, + pods: generatePods(clusterName, replicas, 2, currentRevision), + expUpdateStatefulSets: []string{"cluster1-rep0"}, + expPodUpdateGroups: [][]string{ + {"cluster1-rep0-0"}, + {"cluster1-rep0-1"}, + }, + updateVal: 1, + }, + { + name: "no update annotation, do nothing", + sets: generateSets(clusterName, replicas, "3", false), + pods: generatePods(clusterName, replicas, 3, currentRevision), + expUpdateStatefulSets: []string{}, + updateVal: 3, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + nodes := int32(len(test.pods) / len(test.sets)) + cluster, deps := setupTestCluster( + t, *rawCluster, test.sets, test.pods, int(replicas), nodes, true, + ) + defer deps.cleanup() + c := deps.newController(t) + + mockPlacement(deps, replicas) + + cluster.Spec.Image = newImage + + result, done := waitForStatefulSets(t, c, cluster, "update", waitForStatefulSetsOptions{ + setReadyReplicas: true, + expectedStatefulSets: test.expUpdateStatefulSets, + currentRevision: currentRevision, + }) + assert.True(t, done, "expected all sets to be updated") + assert.Equal(t, test.expPodUpdateGroups, result.podUpdateGroups) + }) + } +} +func TestHandleUpdateClusterFrozen(t *testing.T) { var ( clusterMeta = newMeta("cluster1", map[string]string{ "foo": "bar", @@ -937,7 +1054,7 @@ func TestHandleUpdateClusterFrozen(t *testing.T) { newMeta("cluster1-rep0", nil, nil), } ) - cluster, deps := setupTestCluster(t, *clusterMeta, sets, 3) + cluster, deps := setupTestCluster(t, *clusterMeta, sets, nil, 3, 1, false) defer deps.cleanup() controller := deps.newController(t) @@ -957,3 +1074,66 @@ func TestHandleUpdateClusterFrozen(t *testing.T) { assert.Equal(t, int64(0), count.Load()) } + +func generateSets( + // nolint:unparam + clusterName string, + rf int32, + updateVal string, + withAnnotation bool, +) []*metav1.ObjectMeta { + var sets []*metav1.ObjectMeta + for i := 0; i < int(rf); i++ { + var ann map[string]string + if withAnnotation { + ann = map[string]string{ + annotations.ParallelUpdate: updateVal, + } + } + set := newMeta(fmt.Sprintf("%s-rep%d", clusterName, i), nil, ann) + sets = append(sets, set) + } + + return sets +} + +// nolint:unparam +func generatePods(clusterName string, rf int32, nodes int32, revision string) []*corev1.Pod { + var pods []*corev1.Pod + for i := 0; i < int(rf); i++ { + for j := 0; j < int(nodes); j++ { + pods = append(pods, &corev1.Pod{ + ObjectMeta: *newMeta(fmt.Sprintf("%s-rep%d-%d", clusterName, i, j), map[string]string{ + "controller-revision-hash": fmt.Sprintf("%s-rep%d-%s", clusterName, i, revision), + "operator.m3db.io/stateful-set": fmt.Sprintf("%s-rep%d", clusterName, i), + }, nil), + }) + } + } + + return pods +} + +func mockPlacement(deps *testDeps, replicas int32) { + deps.namespaceClient.EXPECT().List().AnyTimes().Return(&admin.NamespaceGetResponse{ + Registry: &namespacepb.Registry{}, + }, nil) + + var mockInstances []placement.Instance + for i := 0; i < int(replicas); i++ { + inst := placement.NewMockInstance(deps.mockController) + + // Since the controller checks all instances are available in the placement after + // checking and performing any updates, and we're only concerned with the latter + // in this test, configure the mock instances to be unavailable to reduce the + // amount of mocks we need to set up. + inst.EXPECT().IsAvailable().AnyTimes().Return(false) + inst.EXPECT().ID().AnyTimes().Return(strconv.Itoa(i)) + mockInstances = append(mockInstances, inst) + } + + mockPlacement := placement.NewMockPlacement(deps.mockController) + mockPlacement.EXPECT().NumInstances().AnyTimes().Return(len(mockInstances)) + mockPlacement.EXPECT().Instances().AnyTimes().Return(mockInstances) + deps.placementClient.EXPECT().Get().AnyTimes().Return(mockPlacement, nil) +} diff --git a/pkg/k8sops/annotations/annotations.go b/pkg/k8sops/annotations/annotations.go index b625f64d..6544064c 100644 --- a/pkg/k8sops/annotations/annotations.go +++ b/pkg/k8sops/annotations/annotations.go @@ -37,7 +37,12 @@ const ( // Update is the annotation used by the operator to determine if a StatefulSet is // allowed to be updated. Update = "operator.m3db.io/update" - + // ParallelUpdate is the annotation used by the operator to determine if a StatefulSet + // is allowed to be updated in parallel. + ParallelUpdate = "operator.m3db.io/parallel-update" + // ParallelUpdateInProgress is the annotation used by the operator indicate a parallel update + // is underway. This annotation should only be used by the operator. + ParallelUpdateInProgress = "operator.m3db.io/parallel-update-in-progress" // EnabledVal is the value that indicates an annotation is enabled. EnabledVal = "enabled" ) diff --git a/pkg/k8sops/m3db/generators_test.go b/pkg/k8sops/m3db/generators_test.go index 77160d22..b8833a3f 100644 --- a/pkg/k8sops/m3db/generators_test.go +++ b/pkg/k8sops/m3db/generators_test.go @@ -274,6 +274,9 @@ func TestGenerateStatefulSet(t *testing.T) { ServiceAccountName: "m3db-account1", }, }, + UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ + Type: appsv1.RollingUpdateStatefulSetStrategyType, + }, VolumeClaimTemplates: []v1.PersistentVolumeClaim{ { ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/k8sops/m3db/statefulset.go b/pkg/k8sops/m3db/statefulset.go index 1cace5c9..889e1776 100644 --- a/pkg/k8sops/m3db/statefulset.go +++ b/pkg/k8sops/m3db/statefulset.go @@ -184,6 +184,13 @@ func NewBaseStatefulSet(ssName, isolationGroup string, cluster *myspec.M3DBClust stsSpec.PodManagementPolicy = appsv1.ParallelPodManagement } + if cluster.Spec.OnDeleteUpdateStrategy { + stsSpec.UpdateStrategy.Type = appsv1.OnDeleteStatefulSetStrategyType + } else { + // NB(nate); This is the default, but set anyway out of a healthy paranoia. + stsSpec.UpdateStrategy.Type = appsv1.RollingUpdateStatefulSetStrategyType + } + return &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: ssName,