Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for parallel node updates within a statefulset #283

Merged
merged 9 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/apis/m3dboperator/v1alpha1/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ type ClusterSpec struct {
// SidecarVolumes is used to add any volumes that are required by sidecar
// containers.
SidecarVolumes []corev1.Volume `json:"sidecarVolumes,omitempty"`

// OnDeleteUpdateStrategy sets StatefulSets created by the operator to
// have OnDelete as the update strategy instead of RollingUpdate.
OnDeleteUpdateStrategy bool `json:"onDeleteUpdateStrategy,omitempty"`
}

// ExternalCoordinatorConfig defines parameters for using an external
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/m3dboperator/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

288 changes: 242 additions & 46 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"fmt"
"reflect"
"sort"
"strconv"
"sync"

"github.com/m3db/m3db-operator/pkg/apis/m3dboperator"
Expand Down Expand Up @@ -57,12 +58,12 @@ import (
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/pointer"

jsonpatch "github.com/evanphx/json-patch"
pkgerrors "github.com/pkg/errors"
"github.com/uber-go/tally"
"go.uber.org/zap"
"k8s.io/utils/pointer"
)

const (
Expand Down Expand Up @@ -315,7 +316,6 @@ func (c *M3DBController) processClusterQueueItem() bool {

return nil
}(obj)

if err != nil {
runtime.HandleError(err)
}
Expand Down Expand Up @@ -560,29 +560,46 @@ func (c *M3DBController) handleClusterUpdate(cluster *myspec.M3DBCluster) error
return err
}

if !update {
// If we're not updating the statefulset AND we're not using the OnDelete update
// strategy, then move to the next statefulset. When using the OnDelete update
// strategy, we still may want to restart nodes for this particular statefulset,
// so don't continue yet.
if !update && !cluster.Spec.OnDeleteUpdateStrategy {
continue
}

_, err = c.kubeClient.AppsV1().StatefulSets(cluster.Namespace).Update(expected)
if err != nil {
c.logger.Error(err.Error())
return err
// processNext indicates whether we should process the next statefulset in the loop.
// processNext will be true if the statefulset or pods for this statefulset
// are not updated. We do not process the next statefulset if we've performed any
// updates as we want to wait for those updates to be completed successfully first
// (i.e. new pods are in the ready state).
processNext := true
if update {
actual, err = c.applyStatefulSetUpdate(cluster, actual, expected)
if err != nil {
c.logger.Error(err.Error())
return err
}
processNext = false
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than have processNext = false here and continue this loop, can we just return nil so that we exit + re-trigger the entire reconciliation loop? That way we'll run all of our safety checks again.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT this would also match the old post-update behavior.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, yes, that's much simpler. Will update.

Copy link
Collaborator Author

@nbroyles nbroyles Mar 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, wait. Only if the updateStrategy is rolling update can we do that. Still probably nice to bail early if using rolling update, so will modify for that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, pushed a change to clarify this logic even more. It's worth noting that none of the semantics around this loop have changed here. In both the RollingUpdate and OnDelete cases, if changes are made to a statefulset, the operator exits this loop and relies on event updates (i.e. pod ready) to continue updating.

}

c.logger.Info("updated statefulset",
zap.String("name", expected.Name),
zap.Int32("actual_readyReplicas", actual.Status.ReadyReplicas),
zap.Int32("actual_updatedReplicas", actual.Status.UpdatedReplicas),
zap.String("actual_currentRevision", actual.Status.CurrentRevision),
zap.String("actual_updateRevision", actual.Status.UpdateRevision),
zap.Int32("expected_readyReplicas", expected.Status.ReadyReplicas),
zap.Int32("expected_updatedReplicas", expected.Status.UpdatedReplicas),
zap.String("expected_currentRevision", expected.Status.CurrentRevision),
zap.String("expected_updateRevision", expected.Status.UpdateRevision),
zap.Int64("generation", expected.Generation),
zap.Int64("observed", expected.Status.ObservedGeneration),
)
// The OnDelete update strategy requires us to restart the statefulset nodes instead
// of k8s handling it for us, so do that if necessary.
if cluster.Spec.OnDeleteUpdateStrategy {
nodesUpdated, err := c.updateStatefulSetNodes(cluster, actual)
if err != nil {
c.logger.Error("error performing update",
zap.Error(err),
zap.String("namespace", cluster.Namespace),
zap.String("name", actual.Name))
return err
}
processNext = !nodesUpdated
}

if processNext {
continue
}

return nil
}
Expand Down Expand Up @@ -715,31 +732,13 @@ func (c *M3DBController) handleClusterUpdate(cluster *myspec.M3DBCluster) error
}
setLogger.Info("resizing set, desired != current", zap.Int32("newSize", newCount))

setBytes, err := json.Marshal(set)
if err != nil {
if err = c.patchStatefulSet(set, func(set *appsv1.StatefulSet) {
set.Spec.Replicas = pointer.Int32Ptr(newCount)
}); err != nil {
c.logger.Error("error patching statefulset", zap.Error(err))
return err
}

set.Spec.Replicas = pointer.Int32Ptr(newCount)

setModifiedBytes, err := json.Marshal(set)
if err != nil {
return err
}

patchBytes, err := jsonpatch.CreateMergePatch(setBytes, setModifiedBytes)
if err != nil {
return err
}

set, err = c.kubeClient.
AppsV1().
StatefulSets(set.Namespace).
Patch(set.Name, types.MergePatchType, patchBytes)
if err != nil {
return fmt.Errorf("error updating statefulset %s: %v", set.Name, err)
}

return nil
}

Expand All @@ -765,6 +764,186 @@ func (c *M3DBController) handleClusterUpdate(cluster *myspec.M3DBCluster) error
return nil
}

func (c *M3DBController) patchStatefulSet(
set *appsv1.StatefulSet,
action func(set *appsv1.StatefulSet),
) error {
setBytes, err := json.Marshal(set)
if err != nil {
return err
}

action(set)

setModifiedBytes, err := json.Marshal(set)
if err != nil {
return err
}

patchBytes, err := jsonpatch.CreateMergePatch(setBytes, setModifiedBytes)
if err != nil {
return err
}

set, err = c.kubeClient.
AppsV1().
StatefulSets(set.Namespace).
Patch(set.Name, types.MergePatchType, patchBytes)
if err != nil {
return fmt.Errorf("error updating statefulset %s: %w", set.Name, err)
}

return nil
}

func (c *M3DBController) applyStatefulSetUpdate(
cluster *myspec.M3DBCluster,
actual *appsv1.StatefulSet,
expected *appsv1.StatefulSet,
) (*appsv1.StatefulSet, error) {
updated, err := c.kubeClient.AppsV1().StatefulSets(cluster.Namespace).Update(expected)
if err != nil {
c.logger.Error(err.Error())
return nil, err
}

c.logger.Info("updated statefulset",
zap.String("name", expected.Name),
zap.Int32("actual_readyReplicas", actual.Status.ReadyReplicas),
zap.Int32("actual_updatedReplicas", actual.Status.UpdatedReplicas),
zap.String("actual_currentRevision", actual.Status.CurrentRevision),
zap.String("actual_updateRevision", actual.Status.UpdateRevision),
zap.Int32("expected_readyReplicas", expected.Status.ReadyReplicas),
zap.Int32("expected_updatedReplicas", expected.Status.UpdatedReplicas),
zap.String("expected_currentRevision", expected.Status.CurrentRevision),
zap.String("expected_updateRevision", expected.Status.UpdateRevision),
zap.Int64("generation", expected.Generation),
zap.Int64("observed", expected.Status.ObservedGeneration),
)
return updated, nil
}

// updateStatefulSetNodes returns true if it updates any pods
func (c *M3DBController) updateStatefulSetNodes(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it would be nice to be consistent with nodes vs pods in the terminology.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i lean towards pods since this is k8s code

cluster *myspec.M3DBCluster,
sts *appsv1.StatefulSet,
) (bool, error) {
logger := c.logger.With(
zap.String("namespace", cluster.Namespace), zap.String("name", sts.Name),
)

if _, ok := sts.Annotations[annotations.ParallelUpdateInProgress]; !ok {
logger.Debug("no update and no rollout in progress so move to next statefulset")
return false, nil
}

numPods, err := c.getMaxPodsToUpdate(sts)
if err != nil {
return false, err
}

if numPods == 0 {
return false, errors.New("parallel update annotation set to 0. will not perform pod updates")
}

pods, err := c.podsToUpdate(cluster.Namespace, sts, numPods)
if err != nil {
return false, err
}

if len(pods) > 0 {
names := make([]string, 0, len(pods))
for _, pod := range pods {
if err := c.kubeClient.CoreV1().
Pods(pod.Namespace).
Delete(pod.Name, &metav1.DeleteOptions{}); err != nil {
return false, err
}
names = append(names, pod.Name)
}
logger.Info("restarting pods", zap.Any("pods", names))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: you can use zap.Strings here since names is a []string.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, TIL!

return true, nil
}

// If there are no pods to update, we're fully rolled out so remove
// the update annotation.
if err = c.patchStatefulSet(sts, func(set *appsv1.StatefulSet) {
delete(set.Annotations, annotations.ParallelUpdateInProgress)

// NB(nate): K8s handles this for you when using the RollingUpdate update strategy.
// However, since OnDelete removes k8s from the pod update process, it's our
// responsibility to set this once done rolling out.
set.Status.CurrentReplicas = set.Status.UpdatedReplicas
set.Status.CurrentRevision = set.Status.UpdateRevision
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is surprising / also concerning to have to reconcile k8s' status for it. Are there docs somewhere on this?

If we have to do this, I would also prefer we use Update here rather than a patch. That way if k8s has modified the statefulset in between when we decided to change this field + when we made the patch, we'll get a conflict and have to re-reconcile.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's very unfortunate. I pieced it together from a bug report (kubernetes/kubernetes#73492) and looking at the update code (https://github.com/kubernetes/kubernetes/blob/053e7f4b0430e1f3814bb16bdb506cd59bb26593/pkg/controller/statefulset/stateful_set_utils.go#L377). But definitely let me know if I'm off base here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, was looking closer at the StatefulSetInterface and saw this method:

	UpdateStatus(*v1.StatefulSet) (*v1.StatefulSet, error)

What are your thoughts on this change then?

	sts.Status.CurrentReplicas = sts.Status.UpdatedReplicas
	sts.Status.CurrentRevision = sts.Status.UpdateRevision

	if sts, err = c.kubeClient.AppsV1().StatefulSets(cluster.Namespace).UpdateStatus(sts); err != nil {
		return false, err
	}

	if err = c.patchStatefulSet(sts, func(set *appsv1.StatefulSet) {
		delete(set.Annotations, annotations.ParallelUpdateInProgress)
	}); err != nil {
		return false, err
	}

Would that also generate a conflict since it's no longer patching for the revision update? Also, seems safe to patch to remove the annotation, but correct me if I'm wrong.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This SGTM 👍 .

}); err != nil {
return false, err
}
logger.Info("update complete")

return false, nil
}

func (c *M3DBController) getMaxPodsToUpdate(actual *appsv1.StatefulSet) (int, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be a free function rather than a method on *M3DBController?

var (
rawVal string
ok bool
)
if rawVal, ok = actual.Annotations[annotations.ParallelUpdateInProgress]; !ok {
return 0, errors.New("parallel update annotation missing during statefulset update")
}

var (
maxPodsPerUpdate int
err error
)
if maxPodsPerUpdate, err = strconv.Atoi(rawVal); err != nil {
return 0, fmt.Errorf("failed to parse parallel update annotation: %v", rawVal)
}

return maxPodsPerUpdate, nil
}

func (c *M3DBController) podsToUpdate(
namespace string,
sts *appsv1.StatefulSet,
numPods int,
) ([]*corev1.Pod, error) {
currRev := sts.Status.CurrentRevision
if currRev == "" {
return nil, errors.New("currentRevision empty")
} else if currRev == sts.Status.UpdateRevision {
// No pods to update because current and update revision are the same
return nil, nil
}

label := map[string]string{
"controller-revision-hash": sts.Status.CurrentRevision,
}
pods, err := c.podLister.Pods(namespace).List(klabels.Set(label).AsSelector())
if err != nil {
return nil, err
} else if len(pods) == 0 {
return nil, nil
}

// NB(nate): Sort here so updates are always done in a consistent order.
// Statefulset 0 -> N: Pod 0 -> N
sortedPods, err := sortPods(pods)
if err != nil {
return nil, err
}

toUpdate := make([]*corev1.Pod, 0, len(sortedPods))
for _, pod := range sortedPods {
toUpdate = append(toUpdate, pod.pod)
if len(toUpdate) == numPods {
break
}
}

return toUpdate, nil
}

func instancesInIsoGroup(pl m3placement.Placement, isoGroup string) []m3placement.Instance {
insts := []m3placement.Instance{}
for _, inst := range pl.Instances() {
Expand Down Expand Up @@ -876,7 +1055,6 @@ func (c *M3DBController) processPodQueueItem() bool {

return nil
}(obj)

if err != nil {
runtime.HandleError(err)
}
Expand Down Expand Up @@ -1039,7 +1217,16 @@ func updatedStatefulSet(
// The operator will only perform an update if the current StatefulSet has been
// annotated to indicate that it is okay to update it.
if val, ok := actual.Annotations[annotations.Update]; !ok || val != annotations.EnabledVal {
return nil, false, nil
str, ok := actual.Annotations[annotations.ParallelUpdate]
if !ok {
return nil, false, nil
}

if parallelVal, err := strconv.Atoi(str); err != nil {
return nil, false, err
} else if parallelVal < 1 {
return nil, false, fmt.Errorf("parallel update value invalid: %v", str)
}
}

expected, err := m3db.GenerateStatefulSet(cluster, isoGroup.Name, isoGroup.NumInstances)
Expand Down Expand Up @@ -1067,11 +1254,12 @@ func updatedStatefulSet(
}

// If we don't need to perform an update to the StatefulSet's spec, but the StatefulSet
// has the update annotation, we'll still update the StatefulSet to remove the update
// annotation. This ensures that users can always set the update annotation and then
// has an update annotation, we'll still update the StatefulSet to remove the update
// annotation. This ensures that users can always set an update annotation and then
// wait for it to be removed to know if the operator has processed a StatefulSet.
if !update {
delete(actual.Annotations, annotations.Update)
delete(actual.Annotations, annotations.ParallelUpdate)
return actual, true, nil
}

Expand All @@ -1098,6 +1286,14 @@ func copyAnnotations(expected, actual *appsv1.StatefulSet) {
continue
}

// For parallel updates, remove the initial annotation added by the client and add rollout
// in progress annotation. This will ensure that in future passes we don't attempt to
// update the statefulset again unnecessarily and simply roll pods that need to pick up
// updates.
if k == annotations.ParallelUpdate {
expected.Annotations[annotations.ParallelUpdateInProgress] = v
}

if _, ok := expected.Annotations[k]; !ok {
expected.Annotations[k] = v
}
Expand Down
Loading